You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/08/27 17:54:46 UTC

[hadoop] branch trunk updated: Revert "HDDS-1610. applyTransaction failure should not be lost on restart. Contributed by Shashikant Banerjee."

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce8eb12  Revert "HDDS-1610. applyTransaction failure should not be lost on restart. Contributed by Shashikant Banerjee."
ce8eb12 is described below

commit ce8eb1283acbebb990a4f1e40848d78700309222
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Tue Aug 27 23:23:44 2019 +0530

    Revert "HDDS-1610. applyTransaction failure should not be lost on restart. Contributed by Shashikant Banerjee."
    
    This reverts commit 62445021d5d57b0d49adcb1bd4365c13532328fc as it has unintended changes in DirectoryWithSnapshotFeature class..
---
 .../server/ratis/ContainerStateMachine.java        |  84 ++++-------
 .../transport/server/ratis/XceiverServerRatis.java |   9 --
 .../proto/StorageContainerDatanodeProtocol.proto   |   1 -
 .../snapshot/DirectoryWithSnapshotFeature.java     |   4 +-
 .../rpc/TestContainerStateMachineFailures.java     | 156 ++++++---------------
 .../ozone/container/ContainerTestHelper.java       |  16 ---
 .../freon/TestFreonWithDatanodeFastRestart.java    |  17 ++-
 7 files changed, 80 insertions(+), 207 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index aadec8d..f4d4744 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -84,7 +83,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -149,7 +147,6 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final Cache<Long, ByteString> stateMachineDataCache;
   private final boolean isBlockTokenEnabled;
   private final TokenVerifier tokenVerifier;
-  private final AtomicBoolean isStateMachineHealthy;
 
   private final Semaphore applyTransactionSemaphore;
   /**
@@ -187,7 +184,6 @@ public class ContainerStateMachine extends BaseStateMachine {
         ScmConfigKeys.
             DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
     applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
-    isStateMachineHealthy = new AtomicBoolean(true);
     this.executors = new ExecutorService[numContainerOpExecutors];
     for (int i = 0; i < numContainerOpExecutors; i++) {
       final int index = i;
@@ -269,14 +265,6 @@ public class ContainerStateMachine extends BaseStateMachine {
   public long takeSnapshot() throws IOException {
     TermIndex ti = getLastAppliedTermIndex();
     long startTime = Time.monotonicNow();
-    if (!isStateMachineHealthy.get()) {
-      String msg =
-          "Failed to take snapshot " + " for " + gid + " as the stateMachine"
-              + " is unhealthy. The last applied index is at " + ti;
-      StateMachineException sme = new StateMachineException(msg);
-      LOG.error(msg);
-      throw sme;
-    }
     if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
       final File snapshotFile =
           storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
@@ -287,12 +275,12 @@ public class ContainerStateMachine extends BaseStateMachine {
         // make sure the snapshot file is synced
         fos.getFD().sync();
       } catch (IOException ioe) {
-        LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti,
+        LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti,
             snapshotFile);
         throw ioe;
       }
-      LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", gid, ti,
-          snapshotFile, (Time.monotonicNow() - startTime));
+      LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}",
+          gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
       return ti.getIndex();
     }
     return -1;
@@ -397,12 +385,17 @@ public class ContainerStateMachine extends BaseStateMachine {
     return response;
   }
 
-  private ContainerCommandResponseProto runCommand(
+  private ContainerCommandResponseProto runCommandGetResponse(
       ContainerCommandRequestProto requestProto,
       DispatcherContext context) {
     return dispatchCommand(requestProto, context);
   }
 
+  private Message runCommand(ContainerCommandRequestProto requestProto,
+      DispatcherContext context) {
+    return runCommandGetResponse(requestProto, context)::toByteString;
+  }
+
   private ExecutorService getCommandExecutor(
       ContainerCommandRequestProto requestProto) {
     int executorId = (int)(requestProto.getContainerID() % executors.length);
@@ -432,7 +425,7 @@ public class ContainerStateMachine extends BaseStateMachine {
     // thread.
     CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
         CompletableFuture.supplyAsync(() ->
-            runCommand(requestProto, context), chunkExecutor);
+            runCommandGetResponse(requestProto, context), chunkExecutor);
 
     CompletableFuture<Message> raftFuture = new CompletableFuture<>();
 
@@ -509,8 +502,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       metrics.incNumQueryStateMachineOps();
       final ContainerCommandRequestProto requestProto =
           getContainerCommandRequestProto(request.getContent());
-      return CompletableFuture
-          .completedFuture(runCommand(requestProto, null)::toByteString);
+      return CompletableFuture.completedFuture(runCommand(requestProto, null));
     } catch (IOException e) {
       metrics.incNumQueryStateMachineFails();
       return completeExceptionally(e);
@@ -682,58 +674,30 @@ public class ContainerStateMachine extends BaseStateMachine {
       if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
         builder.setCreateContainerSet(createContainerSet);
       }
-      CompletableFuture<Message> applyTransactionFuture =
-          new CompletableFuture<>();
       // Ensure the command gets executed in a separate thread than
       // stateMachineUpdater thread which is calling applyTransaction here.
-      CompletableFuture<ContainerCommandResponseProto> future =
-          CompletableFuture.supplyAsync(
-              () -> runCommand(requestProto, builder.build()),
+      CompletableFuture<Message> future = CompletableFuture
+          .supplyAsync(() -> runCommand(requestProto, builder.build()),
               getCommandExecutor(requestProto));
-      future.thenApply(r -> {
+
+      future.thenAccept(m -> {
         if (trx.getServerRole() == RaftPeerRole.LEADER) {
           long startTime = (long) trx.getStateMachineContext();
           metrics.incPipelineLatency(cmdType,
               Time.monotonicNowNanos() - startTime);
         }
-        if (r.getResult() != ContainerProtos.Result.SUCCESS) {
-          StorageContainerException sce =
-              new StorageContainerException(r.getMessage(), r.getResult());
-          LOG.error(
-              "gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
-                  + "{} Container Result: {}", gid, r.getCmdType(), index,
-              r.getMessage(), r.getResult());
-          metrics.incNumApplyTransactionsFails();
-          // Since the applyTransaction now is completed exceptionally,
-          // before any further snapshot is taken , the exception will be
-          // caught in stateMachineUpdater in Ratis and ratis server will
-          // shutdown.
-          applyTransactionFuture.completeExceptionally(sce);
-          isStateMachineHealthy.compareAndSet(true, false);
-          ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
-        } else {
-          LOG.debug(
-              "gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : "
-                  + "{} Container Result: {}", gid, r.getCmdType(), index,
-              r.getMessage(), r.getResult());
-          applyTransactionFuture.complete(r::toByteString);
-          if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
-            metrics.incNumBytesCommittedCount(
-                requestProto.getWriteChunk().getChunkData().getLen());
-          }
-          // add the entry to the applyTransactionCompletionMap only if the
-          // stateMachine is healthy i.e, there has been no applyTransaction
-          // failures before.
-          if (isStateMachineHealthy.get()) {
-            final Long previous = applyTransactionCompletionMap
+
+        final Long previous =
+            applyTransactionCompletionMap
                 .put(index, trx.getLogEntry().getTerm());
-            Preconditions.checkState(previous == null);
-            updateLastApplied();
-          }
+        Preconditions.checkState(previous == null);
+        if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
+          metrics.incNumBytesCommittedCount(
+              requestProto.getWriteChunk().getChunkData().getLen());
         }
-        return applyTransactionFuture;
+        updateLastApplied();
       }).whenComplete((r, t) -> applyTransactionSemaphore.release());
-      return applyTransactionFuture;
+      return future;
     } catch (IOException | InterruptedException e) {
       metrics.incNumApplyTransactionsFails();
       return completeExceptionally(e);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index b4021cf..f0ed28b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -609,15 +609,6 @@ public final class XceiverServerRatis extends XceiverServer {
     handlePipelineFailure(groupId, roleInfoProto);
   }
 
-  void handleApplyTransactionFailure(RaftGroupId groupId,
-      RaftProtos.RaftPeerRole role) {
-    UUID dnId = RatisHelper.toDatanodeId(getServer().getId());
-    String msg =
-        "Ratis Transaction failure in datanode " + dnId + " with role " + role
-            + " .Triggering pipeline close action.";
-    triggerPipelineClose(groupId, msg,
-        ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true);
-  }
   /**
    * The fact that the snapshot contents cannot be used to actually catch up
    * the follower, it is the reason to initiate close pipeline and
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 1d09dfa..500735a 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -214,7 +214,6 @@ message ClosePipelineInfo {
   enum Reason {
     PIPELINE_FAILED = 1;
     PIPELINE_LOG_FAILED = 2;
-    STATEMACHINE_TRANSACTION_FAILED = 3;
   }
   required PipelineID pipelineID = 1;
   optional Reason reason = 3;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
index ffbc174..7fb639c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
@@ -742,8 +742,8 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
             if (currentINode.isLastReference()) {
               // if this is the last reference, the created list can be
               // destroyed.
-         //     priorDiff.getChildrenDiff().destroyCreatedList(
-         //         reclaimContext, currentINode);
+              priorDiff.getChildrenDiff().destroyCreatedList(
+                  reclaimContext, currentINode);
             } else {
               // we only check the node originally in prior's created list
               for (INode cNode : priorDiff.diff.getCreatedUnmodifiable()) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 86621d6..469eeb0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -22,32 +22,23 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.protocol.RaftRetryFailureException;
-import org.apache.ratis.protocol.StateMachineException;
-import org.apache.ratis.server.storage.FileInfo;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -55,7 +46,6 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -64,8 +54,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    ContainerDataProto.State.UNHEALTHY;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
     HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
@@ -88,7 +77,7 @@ public class TestContainerStateMachineFailures {
   private static String volumeName;
   private static String bucketName;
   private static String path;
-  private static XceiverClientManager xceiverClientManager;
+  private static int chunkSize;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -112,11 +101,6 @@ public class TestContainerStateMachineFailures {
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
         TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
-    conf.setTimeDuration(
-        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
-        1, TimeUnit.SECONDS);
-    conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
     conf.setQuietMode(false);
     cluster =
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
@@ -125,7 +109,6 @@ public class TestContainerStateMachineFailures {
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
     objectStore = client.getObjectStore();
-    xceiverClientManager = new XceiverClientManager(conf);
     volumeName = "testcontainerstatemachinefailures";
     bucketName = volumeName;
     objectStore.createVolume(volumeName);
@@ -149,10 +132,19 @@ public class TestContainerStateMachineFailures {
             .createKey("ratis", 1024, ReplicationType.RATIS,
                 ReplicationFactor.ONE, new HashMap<>());
     byte[] testData = "ratis".getBytes();
+    long written = 0;
     // First write and flush creates a container in the datanode
     key.write(testData);
+    written += testData.length;
     key.flush();
     key.write(testData);
+    written += testData.length;
+
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
+        setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
+        .build();
     KeyOutputStream groupOutputStream =
         (KeyOutputStream) key.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
@@ -165,14 +157,7 @@ public class TestContainerStateMachineFailures {
             .getContainer().getContainerSet()
             .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
             .getContainerPath()));
-    try {
-      // there is only 1 datanode in the pipeline, the pipeline will be closed
-      // and allocation to new pipeline will fail as there is no other dn in
-      // the cluster
-      key.close();
-    } catch(IOException ioe) {
-      Assert.assertTrue(ioe instanceof OMException);
-    }
+    key.close();
     long containerID = omKeyLocationInfo.getContainerID();
 
     // Make sure the container is marked unhealthy
@@ -194,6 +179,22 @@ public class TestContainerStateMachineFailures {
         .getDatanodeStateMachine().getContainer();
     Assert
         .assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
+
+    OzoneKeyDetails keyDetails = objectStore.getVolume(volumeName)
+        .getBucket(bucketName).getKey("ratis");
+
+    /**
+     * Ensure length of data stored in key is equal to number of bytes written.
+     */
+    Assert.assertTrue("Number of bytes stored in the key is not equal " +
+        "to number of bytes written.", keyDetails.getDataSize() == written);
+
+    /**
+     * Pending data from the second write should get written to a new container
+     * during key.close() because the first container is UNHEALTHY by that time
+     */
+    Assert.assertTrue("Expect Key to be stored in 2 separate containers",
+        keyDetails.getOzoneKeyLocations().size() == 2);
   }
 
   @Test
@@ -206,6 +207,12 @@ public class TestContainerStateMachineFailures {
     key.write("ratis".getBytes());
     key.flush();
     key.write("ratis".getBytes());
+
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
+        setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
+        .build();
     KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
         groupOutputStream.getLocationInfoList();
@@ -221,14 +228,8 @@ public class TestContainerStateMachineFailures {
         (KeyValueContainerData) containerData;
     // delete the container db file
     FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
-    try {
-      // there is only 1 datanode in the pipeline, the pipeline will be closed
-      // and allocation to new pipeline will fail as there is no other dn in
-      // the cluster
-      key.close();
-    } catch(IOException ioe) {
-      Assert.assertTrue(ioe instanceof OMException);
-    }
+
+    key.close();
 
     long containerID = omKeyLocationInfo.getContainerID();
 
@@ -269,83 +270,4 @@ public class TestContainerStateMachineFailures {
     Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY,
         dispatcher.dispatch(request.build(), null).getResult());
   }
-
-  @Test
-  public void testApplyTransactionFailure() throws Exception {
-    OzoneOutputStream key =
-        objectStore.getVolume(volumeName).getBucket(bucketName)
-            .createKey("ratis", 1024, ReplicationType.RATIS,
-                ReplicationFactor.ONE, new HashMap<>());
-    // First write and flush creates a container in the datanode
-    key.write("ratis".getBytes());
-    key.flush();
-    key.write("ratis".getBytes());
-    KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
-    List<OmKeyLocationInfo> locationInfoList =
-        groupOutputStream.getLocationInfoList();
-    Assert.assertEquals(1, locationInfoList.size());
-    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
-    ContainerData containerData =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet()
-            .getContainer(omKeyLocationInfo.getContainerID())
-            .getContainerData();
-    Assert.assertTrue(containerData instanceof KeyValueContainerData);
-    KeyValueContainerData keyValueContainerData =
-        (KeyValueContainerData) containerData;
-    key.close();
-    ContainerStateMachine stateMachine =
-        (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
-    SimpleStateMachineStorage storage =
-        (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
-    Path parentPath = storage.findLatestSnapshot().getFile().getPath();
-    // Since the snapshot threshold is set to 1, since there are
-    // applyTransactions, we should see snapshots
-    Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
-    FileInfo snapshot = storage.findLatestSnapshot().getFile();
-    Assert.assertNotNull(snapshot);
-    long containerID = omKeyLocationInfo.getContainerID();
-    // delete the container db file
-    FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
-    Pipeline pipeline = cluster.getStorageContainerLocationClient()
-        .getContainerWithPipeline(containerID).getPipeline();
-    XceiverClientSpi xceiverClient =
-        xceiverClientManager.acquireClient(pipeline);
-    ContainerProtos.ContainerCommandRequestProto.Builder request =
-        ContainerProtos.ContainerCommandRequestProto.newBuilder();
-    request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
-    request.setCmdType(ContainerProtos.Type.CloseContainer);
-    request.setContainerID(containerID);
-    request.setCloseContainer(
-        ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
-    // close container transaction will fail over Ratis and will initiate
-    // a pipeline close action
-
-    // Since the applyTransaction failure is propagated to Ratis,
-    // stateMachineUpdater will it exception while taking the next snapshot
-    // and should shutdown the RaftServerImpl. The client request will fail
-    // with RaftRetryFailureException.
-    try {
-      xceiverClient.sendCommand(request.build());
-      Assert.fail("Expected exception not thrown");
-    } catch (IOException e) {
-      Assert.assertTrue(HddsClientUtils
-          .checkForException(e) instanceof RaftRetryFailureException);
-    }
-    // Make sure the container is marked unhealthy
-    Assert.assertTrue(
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet().getContainer(containerID)
-            .getContainerState()
-            == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
-    try {
-      // try to take a new snapshot, ideally it should just fail
-      stateMachine.takeSnapshot();
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe instanceof StateMachineException);
-    }
-    // Make sure the latest snapshot is same as the previous one
-    FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
-    Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath()));
-  }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 82d34d7..4da1907 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -73,10 +73,6 @@ import org.apache.hadoop.security.token.Token;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -870,16 +866,4 @@ public final class ContainerTestHelper {
       index++;
     }
   }
-
-  public static StateMachine getStateMachine(MiniOzoneCluster cluster)
-      throws Exception {
-    XceiverServerSpi server =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
-            getContainer().getWriteChannel();
-    RaftServerProxy proxy =
-        (RaftServerProxy) (((XceiverServerRatis) server).getServer());
-    RaftGroupId groupId = proxy.getGroupIds().iterator().next();
-    RaftServerImpl impl = proxy.getImpl(groupId);
-    return impl.getStateMachine();
-  }
 }
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
index 545f2b3..8ed3960 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
@@ -22,7 +22,13 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.transport
+    .server.XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+    .XceiverServerRatis;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
@@ -121,6 +127,13 @@ public class TestFreonWithDatanodeFastRestart {
   }
 
   private StateMachine getStateMachine() throws Exception {
-    return ContainerTestHelper.getStateMachine(cluster);
+    XceiverServerSpi server =
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
+            getContainer().getWriteChannel();
+    RaftServerProxy proxy =
+        (RaftServerProxy)(((XceiverServerRatis)server).getServer());
+    RaftGroupId groupId = proxy.getGroupIds().iterator().next();
+    RaftServerImpl impl = proxy.getImpl(groupId);
+    return impl.getStateMachine();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org