You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/08/20 09:07:22 UTC

[hadoop] branch trunk updated: HDDS-1610. applyTransaction failure should not be lost on restart. Contributed by Shashikant Banerjee.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6244502  HDDS-1610. applyTransaction failure should not be lost on restart. Contributed by Shashikant Banerjee.
6244502 is described below

commit 62445021d5d57b0d49adcb1bd4365c13532328fc
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Tue Aug 20 14:37:01 2019 +0530

    HDDS-1610. applyTransaction failure should not be lost on restart. Contributed by Shashikant Banerjee.
---
 .../server/ratis/ContainerStateMachine.java        |  84 +++++++----
 .../transport/server/ratis/XceiverServerRatis.java |   9 ++
 .../proto/StorageContainerDatanodeProtocol.proto   |   1 +
 .../snapshot/DirectoryWithSnapshotFeature.java     |   4 +-
 .../rpc/TestContainerStateMachineFailures.java     | 156 +++++++++++++++------
 .../ozone/container/ContainerTestHelper.java       |  16 +++
 .../freon/TestFreonWithDatanodeFastRestart.java    |  17 +--
 7 files changed, 207 insertions(+), 80 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index f4d4744..aadec8d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -83,6 +84,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -147,6 +149,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final Cache<Long, ByteString> stateMachineDataCache;
   private final boolean isBlockTokenEnabled;
   private final TokenVerifier tokenVerifier;
+  private final AtomicBoolean isStateMachineHealthy;
 
   private final Semaphore applyTransactionSemaphore;
   /**
@@ -184,6 +187,7 @@ public class ContainerStateMachine extends BaseStateMachine {
         ScmConfigKeys.
             DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
     applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
+    isStateMachineHealthy = new AtomicBoolean(true);
     this.executors = new ExecutorService[numContainerOpExecutors];
     for (int i = 0; i < numContainerOpExecutors; i++) {
       final int index = i;
@@ -265,6 +269,14 @@ public class ContainerStateMachine extends BaseStateMachine {
   public long takeSnapshot() throws IOException {
     TermIndex ti = getLastAppliedTermIndex();
     long startTime = Time.monotonicNow();
+    if (!isStateMachineHealthy.get()) {
+      String msg =
+          "Failed to take snapshot " + " for " + gid + " as the stateMachine"
+              + " is unhealthy. The last applied index is at " + ti;
+      StateMachineException sme = new StateMachineException(msg);
+      LOG.error(msg);
+      throw sme;
+    }
     if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
       final File snapshotFile =
           storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
@@ -275,12 +287,12 @@ public class ContainerStateMachine extends BaseStateMachine {
         // make sure the snapshot file is synced
         fos.getFD().sync();
       } catch (IOException ioe) {
-        LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti,
+        LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti,
             snapshotFile);
         throw ioe;
       }
-      LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}",
-          gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
+      LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", gid, ti,
+          snapshotFile, (Time.monotonicNow() - startTime));
       return ti.getIndex();
     }
     return -1;
@@ -385,17 +397,12 @@ public class ContainerStateMachine extends BaseStateMachine {
     return response;
   }
 
-  private ContainerCommandResponseProto runCommandGetResponse(
+  private ContainerCommandResponseProto runCommand(
       ContainerCommandRequestProto requestProto,
       DispatcherContext context) {
     return dispatchCommand(requestProto, context);
   }
 
-  private Message runCommand(ContainerCommandRequestProto requestProto,
-      DispatcherContext context) {
-    return runCommandGetResponse(requestProto, context)::toByteString;
-  }
-
   private ExecutorService getCommandExecutor(
       ContainerCommandRequestProto requestProto) {
     int executorId = (int)(requestProto.getContainerID() % executors.length);
@@ -425,7 +432,7 @@ public class ContainerStateMachine extends BaseStateMachine {
     // thread.
     CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
         CompletableFuture.supplyAsync(() ->
-            runCommandGetResponse(requestProto, context), chunkExecutor);
+            runCommand(requestProto, context), chunkExecutor);
 
     CompletableFuture<Message> raftFuture = new CompletableFuture<>();
 
@@ -502,7 +509,8 @@ public class ContainerStateMachine extends BaseStateMachine {
       metrics.incNumQueryStateMachineOps();
       final ContainerCommandRequestProto requestProto =
           getContainerCommandRequestProto(request.getContent());
-      return CompletableFuture.completedFuture(runCommand(requestProto, null));
+      return CompletableFuture
+          .completedFuture(runCommand(requestProto, null)::toByteString);
     } catch (IOException e) {
       metrics.incNumQueryStateMachineFails();
       return completeExceptionally(e);
@@ -674,30 +682,58 @@ public class ContainerStateMachine extends BaseStateMachine {
       if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
         builder.setCreateContainerSet(createContainerSet);
       }
+      CompletableFuture<Message> applyTransactionFuture =
+          new CompletableFuture<>();
       // Ensure the command gets executed in a separate thread than
       // stateMachineUpdater thread which is calling applyTransaction here.
-      CompletableFuture<Message> future = CompletableFuture
-          .supplyAsync(() -> runCommand(requestProto, builder.build()),
+      CompletableFuture<ContainerCommandResponseProto> future =
+          CompletableFuture.supplyAsync(
+              () -> runCommand(requestProto, builder.build()),
               getCommandExecutor(requestProto));
-
-      future.thenAccept(m -> {
+      future.thenApply(r -> {
         if (trx.getServerRole() == RaftPeerRole.LEADER) {
           long startTime = (long) trx.getStateMachineContext();
           metrics.incPipelineLatency(cmdType,
               Time.monotonicNowNanos() - startTime);
         }
-
-        final Long previous =
-            applyTransactionCompletionMap
+        if (r.getResult() != ContainerProtos.Result.SUCCESS) {
+          StorageContainerException sce =
+              new StorageContainerException(r.getMessage(), r.getResult());
+          LOG.error(
+              "gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
+                  + "{} Container Result: {}", gid, r.getCmdType(), index,
+              r.getMessage(), r.getResult());
+          metrics.incNumApplyTransactionsFails();
+          // Since the applyTransaction now is completed exceptionally,
+          // before any further snapshot is taken , the exception will be
+          // caught in stateMachineUpdater in Ratis and ratis server will
+          // shutdown.
+          applyTransactionFuture.completeExceptionally(sce);
+          isStateMachineHealthy.compareAndSet(true, false);
+          ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
+        } else {
+          LOG.debug(
+              "gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : "
+                  + "{} Container Result: {}", gid, r.getCmdType(), index,
+              r.getMessage(), r.getResult());
+          applyTransactionFuture.complete(r::toByteString);
+          if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
+            metrics.incNumBytesCommittedCount(
+                requestProto.getWriteChunk().getChunkData().getLen());
+          }
+          // add the entry to the applyTransactionCompletionMap only if the
+          // stateMachine is healthy i.e, there has been no applyTransaction
+          // failures before.
+          if (isStateMachineHealthy.get()) {
+            final Long previous = applyTransactionCompletionMap
                 .put(index, trx.getLogEntry().getTerm());
-        Preconditions.checkState(previous == null);
-        if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
-          metrics.incNumBytesCommittedCount(
-              requestProto.getWriteChunk().getChunkData().getLen());
+            Preconditions.checkState(previous == null);
+            updateLastApplied();
+          }
         }
-        updateLastApplied();
+        return applyTransactionFuture;
       }).whenComplete((r, t) -> applyTransactionSemaphore.release());
-      return future;
+      return applyTransactionFuture;
     } catch (IOException | InterruptedException e) {
       metrics.incNumApplyTransactionsFails();
       return completeExceptionally(e);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 54e8f3e..4cb4cbb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -609,6 +609,15 @@ public final class XceiverServerRatis extends XceiverServer {
     handlePipelineFailure(groupId, roleInfoProto);
   }
 
+  void handleApplyTransactionFailure(RaftGroupId groupId,
+      RaftProtos.RaftPeerRole role) {
+    UUID dnId = RatisHelper.toDatanodeId(getServer().getId());
+    String msg =
+        "Ratis Transaction failure in datanode " + dnId + " with role " + role
+            + " .Triggering pipeline close action.";
+    triggerPipelineClose(groupId, msg,
+        ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true);
+  }
   /**
    * The fact that the snapshot contents cannot be used to actually catch up
    * the follower, it is the reason to initiate close pipeline and
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 500735a..1d09dfa 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -214,6 +214,7 @@ message ClosePipelineInfo {
   enum Reason {
     PIPELINE_FAILED = 1;
     PIPELINE_LOG_FAILED = 2;
+    STATEMACHINE_TRANSACTION_FAILED = 3;
   }
   required PipelineID pipelineID = 1;
   optional Reason reason = 3;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
index 7fb639c..ffbc174 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
@@ -742,8 +742,8 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
             if (currentINode.isLastReference()) {
               // if this is the last reference, the created list can be
               // destroyed.
-              priorDiff.getChildrenDiff().destroyCreatedList(
-                  reclaimContext, currentINode);
+         //     priorDiff.getChildrenDiff().destroyCreatedList(
+         //         reclaimContext, currentINode);
             } else {
               // we only check the node originally in prior's created list
               for (INode cNode : priorDiff.diff.getCreatedUnmodifiable()) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 469eeb0..86621d6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -22,23 +22,32 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.RaftRetryFailureException;
+import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -46,6 +55,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -54,7 +64,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+    ContainerDataProto.State.UNHEALTHY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
     HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
@@ -77,7 +88,7 @@ public class TestContainerStateMachineFailures {
   private static String volumeName;
   private static String bucketName;
   private static String path;
-  private static int chunkSize;
+  private static XceiverClientManager xceiverClientManager;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -101,6 +112,11 @@ public class TestContainerStateMachineFailures {
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
         TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
+    conf.setTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
+        1, TimeUnit.SECONDS);
+    conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
     conf.setQuietMode(false);
     cluster =
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
@@ -109,6 +125,7 @@ public class TestContainerStateMachineFailures {
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
     objectStore = client.getObjectStore();
+    xceiverClientManager = new XceiverClientManager(conf);
     volumeName = "testcontainerstatemachinefailures";
     bucketName = volumeName;
     objectStore.createVolume(volumeName);
@@ -132,19 +149,10 @@ public class TestContainerStateMachineFailures {
             .createKey("ratis", 1024, ReplicationType.RATIS,
                 ReplicationFactor.ONE, new HashMap<>());
     byte[] testData = "ratis".getBytes();
-    long written = 0;
     // First write and flush creates a container in the datanode
     key.write(testData);
-    written += testData.length;
     key.flush();
     key.write(testData);
-    written += testData.length;
-
-    //get the name of a valid container
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
-        setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
-        .build();
     KeyOutputStream groupOutputStream =
         (KeyOutputStream) key.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
@@ -157,7 +165,14 @@ public class TestContainerStateMachineFailures {
             .getContainer().getContainerSet()
             .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
             .getContainerPath()));
-    key.close();
+    try {
+      // there is only 1 datanode in the pipeline, the pipeline will be closed
+      // and allocation to new pipeline will fail as there is no other dn in
+      // the cluster
+      key.close();
+    } catch(IOException ioe) {
+      Assert.assertTrue(ioe instanceof OMException);
+    }
     long containerID = omKeyLocationInfo.getContainerID();
 
     // Make sure the container is marked unhealthy
@@ -179,22 +194,6 @@ public class TestContainerStateMachineFailures {
         .getDatanodeStateMachine().getContainer();
     Assert
         .assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
-
-    OzoneKeyDetails keyDetails = objectStore.getVolume(volumeName)
-        .getBucket(bucketName).getKey("ratis");
-
-    /**
-     * Ensure length of data stored in key is equal to number of bytes written.
-     */
-    Assert.assertTrue("Number of bytes stored in the key is not equal " +
-        "to number of bytes written.", keyDetails.getDataSize() == written);
-
-    /**
-     * Pending data from the second write should get written to a new container
-     * during key.close() because the first container is UNHEALTHY by that time
-     */
-    Assert.assertTrue("Expect Key to be stored in 2 separate containers",
-        keyDetails.getOzoneKeyLocations().size() == 2);
   }
 
   @Test
@@ -207,12 +206,6 @@ public class TestContainerStateMachineFailures {
     key.write("ratis".getBytes());
     key.flush();
     key.write("ratis".getBytes());
-
-    //get the name of a valid container
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
-        setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
-        .build();
     KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
         groupOutputStream.getLocationInfoList();
@@ -228,8 +221,14 @@ public class TestContainerStateMachineFailures {
         (KeyValueContainerData) containerData;
     // delete the container db file
     FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
-
-    key.close();
+    try {
+      // there is only 1 datanode in the pipeline, the pipeline will be closed
+      // and allocation to new pipeline will fail as there is no other dn in
+      // the cluster
+      key.close();
+    } catch(IOException ioe) {
+      Assert.assertTrue(ioe instanceof OMException);
+    }
 
     long containerID = omKeyLocationInfo.getContainerID();
 
@@ -270,4 +269,83 @@ public class TestContainerStateMachineFailures {
     Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY,
         dispatcher.dispatch(request.build(), null).getResult());
   }
+
+  @Test
+  public void testApplyTransactionFailure() throws Exception {
+    OzoneOutputStream key =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey("ratis", 1024, ReplicationType.RATIS,
+                ReplicationFactor.ONE, new HashMap<>());
+    // First write and flush creates a container in the datanode
+    key.write("ratis".getBytes());
+    key.flush();
+    key.write("ratis".getBytes());
+    KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    ContainerData containerData =
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer().getContainerSet()
+            .getContainer(omKeyLocationInfo.getContainerID())
+            .getContainerData();
+    Assert.assertTrue(containerData instanceof KeyValueContainerData);
+    KeyValueContainerData keyValueContainerData =
+        (KeyValueContainerData) containerData;
+    key.close();
+    ContainerStateMachine stateMachine =
+        (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
+    SimpleStateMachineStorage storage =
+        (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+    Path parentPath = storage.findLatestSnapshot().getFile().getPath();
+    // Since the snapshot threshold is set to 1, since there are
+    // applyTransactions, we should see snapshots
+    Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
+    FileInfo snapshot = storage.findLatestSnapshot().getFile();
+    Assert.assertNotNull(snapshot);
+    long containerID = omKeyLocationInfo.getContainerID();
+    // delete the container db file
+    FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
+    Pipeline pipeline = cluster.getStorageContainerLocationClient()
+        .getContainerWithPipeline(containerID).getPipeline();
+    XceiverClientSpi xceiverClient =
+        xceiverClientManager.acquireClient(pipeline);
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
+    request.setCmdType(ContainerProtos.Type.CloseContainer);
+    request.setContainerID(containerID);
+    request.setCloseContainer(
+        ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+    // close container transaction will fail over Ratis and will initiate
+    // a pipeline close action
+
+    // Since the applyTransaction failure is propagated to Ratis,
+    // stateMachineUpdater will it exception while taking the next snapshot
+    // and should shutdown the RaftServerImpl. The client request will fail
+    // with RaftRetryFailureException.
+    try {
+      xceiverClient.sendCommand(request.build());
+      Assert.fail("Expected exception not thrown");
+    } catch (IOException e) {
+      Assert.assertTrue(HddsClientUtils
+          .checkForException(e) instanceof RaftRetryFailureException);
+    }
+    // Make sure the container is marked unhealthy
+    Assert.assertTrue(
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer().getContainerSet().getContainer(containerID)
+            .getContainerState()
+            == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+    try {
+      // try to take a new snapshot, ideally it should just fail
+      stateMachine.takeSnapshot();
+    } catch (IOException ioe) {
+      Assert.assertTrue(ioe instanceof StateMachineException);
+    }
+    // Make sure the latest snapshot is same as the previous one
+    FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
+    Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath()));
+  }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 4da1907..82d34d7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -73,6 +73,10 @@ import org.apache.hadoop.security.token.Token;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -866,4 +870,16 @@ public final class ContainerTestHelper {
       index++;
     }
   }
+
+  public static StateMachine getStateMachine(MiniOzoneCluster cluster)
+      throws Exception {
+    XceiverServerSpi server =
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
+            getContainer().getWriteChannel();
+    RaftServerProxy proxy =
+        (RaftServerProxy) (((XceiverServerRatis) server).getServer());
+    RaftGroupId groupId = proxy.getGroupIds().iterator().next();
+    RaftServerImpl impl = proxy.getImpl(groupId);
+    return impl.getStateMachine();
+  }
 }
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
index 8ed3960..545f2b3 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java
@@ -22,13 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.container.common.transport
-    .server.XceiverServerSpi;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis
-    .XceiverServerRatis;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
@@ -127,13 +121,6 @@ public class TestFreonWithDatanodeFastRestart {
   }
 
   private StateMachine getStateMachine() throws Exception {
-    XceiverServerSpi server =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
-            getContainer().getWriteChannel();
-    RaftServerProxy proxy =
-        (RaftServerProxy)(((XceiverServerRatis)server).getServer());
-    RaftGroupId groupId = proxy.getGroupIds().iterator().next();
-    RaftServerImpl impl = proxy.getImpl(groupId);
-    return impl.getStateMachine();
+    return ContainerTestHelper.getStateMachine(cluster);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org