You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2020/06/26 10:08:06 UTC

[hadoop-ozone] branch master updated: HDDS-3018. Fix TestContainerStateMachineFailures.java (#556)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 025fc54  HDDS-3018. Fix TestContainerStateMachineFailures.java (#556)
025fc54 is described below

commit 025fc547f3c8092ba687311c25b322698804f14a
Author: bshashikant <sh...@apache.org>
AuthorDate: Fri Jun 26 15:37:51 2020 +0530

    HDDS-3018. Fix TestContainerStateMachineFailures.java (#556)
---
 .../server/ratis/ContainerStateMachine.java        |  14 +-
 .../rpc/TestContainerStateMachineFailures.java     | 419 +++++++++------------
 .../client/rpc/TestValidateBCSIDOnRestart.java     | 258 +++++++++++++
 .../apache/hadoop/ozone/container/TestHelper.java  |  10 +
 4 files changed, 459 insertions(+), 242 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 96675c8..859cd20 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -237,19 +237,25 @@ public class ContainerStateMachine extends BaseStateMachine {
 
     // initialize the dispatcher with snapshot so that it build the missing
     // container list
+    buildMissingContainerSet(snapshotFile);
+    return last.getIndex();
+  }
+
+  @VisibleForTesting
+  public void buildMissingContainerSet(File snapshotFile) throws IOException {
+    // initialize the dispatcher with snapshot so that it build the missing
+    // container list
     try (FileInputStream fin = new FileInputStream(snapshotFile)) {
       ContainerProtos.Container2BCSIDMapProto proto =
-          ContainerProtos.Container2BCSIDMapProto
-              .parseFrom(fin);
+              ContainerProtos.Container2BCSIDMapProto
+                      .parseFrom(fin);
       // read the created containers list from the snapshot file and add it to
       // the container2BCSIDMap here.
       // container2BCSIDMap will further grow as and when containers get created
       container2BCSIDMap.putAll(proto.getContainer2BCSIDMap());
       dispatcher.buildMissingContainerSetAndValidate(container2BCSIDMap);
     }
-    return last.getIndex();
   }
-
   /**
    * As a part of taking snapshot with Ratis StateMachine, it will persist
    * the existing container set in the snapshotFile.
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 17875e9..1e44562 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -17,19 +17,20 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
-import com.google.common.primitives.Longs;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -43,22 +44,18 @@ import org.apache.hadoop.ozone.container.TestHelper;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
-import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.
+        ContainerStateMachine;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
@@ -68,6 +65,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -81,8 +79,6 @@ import static org.apache.hadoop.hdds.HddsConfigKeys
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
     ContainerDataProto.State.UNHEALTHY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
-    HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
     OZONE_SCM_STALENODE_INTERVAL;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
     OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
@@ -94,7 +90,6 @@ import static org.junit.Assert.fail;
  * Tests the containerStateMachine failure handling.
  */
 
-@Ignore
 public class TestContainerStateMachineFailures {
 
   private static MiniOzoneCluster cluster;
@@ -103,7 +98,6 @@ public class TestContainerStateMachineFailures {
   private static ObjectStore objectStore;
   private static String volumeName;
   private static String bucketName;
-  private static String path;
   private static XceiverClientManager xceiverClientManager;
 
   /**
@@ -114,32 +108,46 @@ public class TestContainerStateMachineFailures {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
-    path = GenericTestUtils
-        .getTempPath(TestContainerStateMachineFailures.class.getSimpleName());
-    File baseDir = new File(path);
-    baseDir.mkdirs();
-
-
+    conf.setBoolean(OzoneConfigKeys.
+            OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
     conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
-    conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1,
         TimeUnit.SECONDS);
+
     conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
-        + ".client.request.write.timeout", 30, TimeUnit.SECONDS);
+            + ".client.request.write.timeout", 10, TimeUnit.SECONDS);
     conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
-        + ".client.request.watch.timeout", 30, TimeUnit.SECONDS);
+            + ".client.request.watch.timeout", 10, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+                    DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY,
+            3, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+                    DatanodeRatisServerConfig.
+                            RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY,
+            10, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
+                    "rpc.request.timeout",
+            3, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
+                    "watch.request.timeout",
+            10, TimeUnit.SECONDS);
     conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
     conf.setQuietMode(false);
     cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10).setHbInterval(200)
             .build();
     cluster.waitForClusterToBeReady();
+    cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getRpcClient(conf);
     objectStore = client.getObjectStore();
@@ -163,80 +171,85 @@ public class TestContainerStateMachineFailures {
   @Test
   public void testContainerStateMachineFailures() throws Exception {
     OzoneOutputStream key =
-        objectStore.getVolume(volumeName).getBucket(bucketName)
-            .createKey("ratis", 1024, ReplicationType.RATIS,
-                ReplicationFactor.ONE, new HashMap<>());
+            objectStore.getVolume(volumeName).getBucket(bucketName)
+                    .createKey("ratis", 1024, ReplicationType.RATIS,
+                            ReplicationFactor.ONE, new HashMap<>());
     byte[] testData = "ratis".getBytes();
     // First write and flush creates a container in the datanode
     key.write(testData);
     key.flush();
     key.write(testData);
     KeyOutputStream groupOutputStream =
-        (KeyOutputStream) key.getOutputStream();
+            (KeyOutputStream) key.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
-        groupOutputStream.getLocationInfoList();
+            groupOutputStream.getLocationInfoList();
     Assert.assertEquals(1, locationInfoList.size());
     OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+            cluster);
     // delete the container dir
-    FileUtil.fullyDelete(new File(
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet()
-            .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
-            .getContainerPath()));
+    FileUtil.fullyDelete(new File(dn.getDatanodeStateMachine()
+                    .getContainer().getContainerSet()
+                    .getContainer(omKeyLocationInfo.getContainerID()).
+                    getContainerData().getContainerPath()));
     try {
       // there is only 1 datanode in the pipeline, the pipeline will be closed
       // and allocation to new pipeline will fail as there is no other dn in
       // the cluster
       key.close();
-    } catch(IOException ioe) {
-      Assert.assertTrue(ioe instanceof OMException);
+    } catch (IOException ioe) {
     }
     long containerID = omKeyLocationInfo.getContainerID();
 
     // Make sure the container is marked unhealthy
     Assert.assertTrue(
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet()
-            .getContainer(containerID)
-            .getContainerState()
-            == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
-    OzoneContainer ozoneContainer = cluster.getHddsDatanodes().get(0)
-        .getDatanodeStateMachine().getContainer();
-    // make sure the missing containerSet is empty
-    HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
-    Assert.assertTrue(dispatcher.getMissingContainerSet().isEmpty());
+            dn.getDatanodeStateMachine()
+                    .getContainer().getContainerSet()
+                    .getContainer(containerID)
+                    .getContainerState()
+                    == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+    OzoneContainer ozoneContainer;
 
     // restart the hdds datanode, container should not in the regular set
-    cluster.restartHddsDatanode(0, true);
-    ozoneContainer = cluster.getHddsDatanodes().get(0)
-        .getDatanodeStateMachine().getContainer();
-    Assert
-        .assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
+    OzoneConfiguration config = dn.getConf();
+    final String dir = config.get(OzoneConfigKeys.
+            DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR)
+            + UUID.randomUUID();
+    config.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
+    int index = cluster.getHddsDatanodeIndex(dn.getDatanodeDetails());
+    cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
+    ozoneContainer = cluster.getHddsDatanodes().get(index)
+            .getDatanodeStateMachine().getContainer();
+    Assert.assertNull(ozoneContainer.getContainerSet().
+                    getContainer(containerID));
   }
 
   @Test
   public void testUnhealthyContainer() throws Exception {
     OzoneOutputStream key =
-        objectStore.getVolume(volumeName).getBucket(bucketName)
-            .createKey("ratis", 1024, ReplicationType.RATIS,
-                ReplicationFactor.ONE, new HashMap<>());
+            objectStore.getVolume(volumeName).getBucket(bucketName)
+                    .createKey("ratis", 1024, ReplicationType.RATIS,
+                            ReplicationFactor.ONE, new HashMap<>());
     // First write and flush creates a container in the datanode
     key.write("ratis".getBytes());
     key.flush();
     key.write("ratis".getBytes());
-    KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream groupOutputStream = (KeyOutputStream) key
+            .getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
-        groupOutputStream.getLocationInfoList();
+            groupOutputStream.getLocationInfoList();
     Assert.assertEquals(1, locationInfoList.size());
     OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+            cluster);
     ContainerData containerData =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet()
-            .getContainer(omKeyLocationInfo.getContainerID())
-            .getContainerData();
+            dn.getDatanodeStateMachine()
+                    .getContainer().getContainerSet()
+                    .getContainer(omKeyLocationInfo.getContainerID())
+                    .getContainerData();
     Assert.assertTrue(containerData instanceof KeyValueContainerData);
     KeyValueContainerData keyValueContainerData =
-        (KeyValueContainerData) containerData;
+            (KeyValueContainerData) containerData;
     // delete the container db file
     FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
     try {
@@ -244,78 +257,90 @@ public class TestContainerStateMachineFailures {
       // and allocation to new pipeline will fail as there is no other dn in
       // the cluster
       key.close();
-    } catch(IOException ioe) {
-      Assert.assertTrue(ioe instanceof OMException);
+    } catch (IOException ioe) {
     }
 
     long containerID = omKeyLocationInfo.getContainerID();
 
     // Make sure the container is marked unhealthy
     Assert.assertTrue(
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet().getContainer(containerID)
-            .getContainerState()
-            == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+            dn.getDatanodeStateMachine()
+                    .getContainer().getContainerSet().getContainer(containerID)
+                    .getContainerState()
+                    == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
     // Check metadata in the .container file
     File containerFile = new File(keyValueContainerData.getMetadataPath(),
-        containerID + OzoneConsts.CONTAINER_EXTENSION);
+            containerID + OzoneConsts.CONTAINER_EXTENSION);
 
     keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
-        .readContainerFile(containerFile);
+            .readContainerFile(containerFile);
     assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
 
+    OzoneConfiguration config = dn.getConf();
+    final String dir = config.get(OzoneConfigKeys.
+            DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR)
+            + UUID.randomUUID();
+    config.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
+    int index = cluster.getHddsDatanodeIndex(dn.getDatanodeDetails());
     // restart the hdds datanode and see if the container is listed in the
     // in the missing container set and not in the regular set
-    cluster.restartHddsDatanode(0, true);
+    cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
     // make sure the container state is still marked unhealthy after restart
     keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
-        .readContainerFile(containerFile);
+            .readContainerFile(containerFile);
     assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
 
     OzoneContainer ozoneContainer;
-    ozoneContainer = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-        .getContainer();
-    HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
+    HddsDatanodeService dnService = cluster.getHddsDatanodes().get(index);
+    ozoneContainer = dnService
+            .getDatanodeStateMachine().getContainer();
+    HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer
+            .getDispatcher();
     ContainerProtos.ContainerCommandRequestProto.Builder request =
-        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+            ContainerProtos.ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.CloseContainer);
     request.setContainerID(containerID);
     request.setCloseContainer(
-        ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
-    request.setDatanodeUuid(
-        cluster.getHddsDatanodes().get(0).getDatanodeDetails().getUuidString());
+            ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+    request.setDatanodeUuid(dnService.getDatanodeDetails().getUuidString());
     Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY,
-        dispatcher.dispatch(request.build(), null).getResult());
+            dispatcher.dispatch(request.build(), null)
+                    .getResult());
   }
 
   @Test
   public void testApplyTransactionFailure() throws Exception {
     OzoneOutputStream key =
-        objectStore.getVolume(volumeName).getBucket(bucketName)
-            .createKey("ratis", 1024, ReplicationType.RATIS,
-                ReplicationFactor.ONE, new HashMap<>());
+            objectStore.getVolume(volumeName).getBucket(bucketName)
+                    .createKey("ratis", 1024, ReplicationType.RATIS,
+                            ReplicationFactor.ONE, new HashMap<>());
     // First write and flush creates a container in the datanode
     key.write("ratis".getBytes());
     key.flush();
     key.write("ratis".getBytes());
-    KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream groupOutputStream = (KeyOutputStream) key.
+            getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
-        groupOutputStream.getLocationInfoList();
+            groupOutputStream.getLocationInfoList();
     Assert.assertEquals(1, locationInfoList.size());
     OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
-    ContainerData containerData =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet()
-            .getContainer(omKeyLocationInfo.getContainerID())
-            .getContainerData();
+    HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+            cluster);
+    int index = cluster.getHddsDatanodeIndex(dn.getDatanodeDetails());
+    ContainerData containerData = dn.getDatanodeStateMachine()
+                    .getContainer().getContainerSet()
+                    .getContainer(omKeyLocationInfo.getContainerID())
+                    .getContainerData();
     Assert.assertTrue(containerData instanceof KeyValueContainerData);
     KeyValueContainerData keyValueContainerData =
-        (KeyValueContainerData) containerData;
+            (KeyValueContainerData) containerData;
     key.close();
     ContainerStateMachine stateMachine =
-        (ContainerStateMachine) TestHelper.getStateMachine(cluster);
+        (ContainerStateMachine) TestHelper.getStateMachine(cluster.
+            getHddsDatanodes().get(index), omKeyLocationInfo.getPipeline());
     SimpleStateMachineStorage storage =
-        (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+            (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+    stateMachine.takeSnapshot();
     Path parentPath = storage.findLatestSnapshot().getFile().getPath();
     // Since the snapshot threshold is set to 1, since there are
     // applyTransactions, we should see snapshots
@@ -326,36 +351,30 @@ public class TestContainerStateMachineFailures {
     // delete the container db file
     FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
     Pipeline pipeline = cluster.getStorageContainerLocationClient()
-        .getContainerWithPipeline(containerID).getPipeline();
+            .getContainerWithPipeline(containerID).getPipeline();
     XceiverClientSpi xceiverClient =
-        xceiverClientManager.acquireClient(pipeline);
+            xceiverClientManager.acquireClient(pipeline);
     ContainerProtos.ContainerCommandRequestProto.Builder request =
-        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+            ContainerProtos.ContainerCommandRequestProto.newBuilder();
     request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
     request.setCmdType(ContainerProtos.Type.CloseContainer);
     request.setContainerID(containerID);
     request.setCloseContainer(
-        ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+            ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
     // close container transaction will fail over Ratis and will initiate
     // a pipeline close action
 
-    // Since the applyTransaction failure is propagated to Ratis,
-    // stateMachineUpdater will it exception while taking the next snapshot
-    // and should shutdown the RaftServerImpl. The client request will fail
-    // with RaftRetryFailureException.
     try {
       xceiverClient.sendCommand(request.build());
       Assert.fail("Expected exception not thrown");
     } catch (IOException e) {
-      Assert.assertTrue(HddsClientUtils
-          .checkForException(e) instanceof RaftRetryFailureException);
+      // Exception should be thrown
     }
     // Make sure the container is marked unhealthy
-    Assert.assertTrue(
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet().getContainer(containerID)
-            .getContainerState()
-            == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+    Assert.assertTrue(dn.getDatanodeStateMachine()
+                    .getContainer().getContainerSet().getContainer(containerID)
+                    .getContainerState()
+                    == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
     try {
       // try to take a new snapshot, ideally it should just fail
       stateMachine.takeSnapshot();
@@ -369,59 +388,61 @@ public class TestContainerStateMachineFailures {
 
   @Test
   public void testApplyTransactionIdempotencyWithClosedContainer()
-      throws Exception {
+          throws Exception {
     OzoneOutputStream key =
-        objectStore.getVolume(volumeName).getBucket(bucketName)
-            .createKey("ratis", 1024, ReplicationType.RATIS,
-                ReplicationFactor.ONE, new HashMap<>());
+            objectStore.getVolume(volumeName).getBucket(bucketName)
+                    .createKey("ratis", 1024, ReplicationType.RATIS,
+                            ReplicationFactor.ONE, new HashMap<>());
     // First write and flush creates a container in the datanode
     key.write("ratis".getBytes());
     key.flush();
     key.write("ratis".getBytes());
     KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
-        groupOutputStream.getLocationInfoList();
+            groupOutputStream.getLocationInfoList();
     Assert.assertEquals(1, locationInfoList.size());
     OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
-    ContainerData containerData =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet()
-            .getContainer(omKeyLocationInfo.getContainerID())
-            .getContainerData();
+    HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+            cluster);
+    ContainerData containerData = dn.getDatanodeStateMachine()
+                    .getContainer().getContainerSet()
+                    .getContainer(omKeyLocationInfo.getContainerID())
+                    .getContainerData();
     Assert.assertTrue(containerData instanceof KeyValueContainerData);
     key.close();
     ContainerStateMachine stateMachine =
-        (ContainerStateMachine) TestHelper.getStateMachine(cluster);
+            (ContainerStateMachine) TestHelper.getStateMachine(dn,
+                    omKeyLocationInfo.getPipeline());
     SimpleStateMachineStorage storage =
-        (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+            (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
     Path parentPath = storage.findLatestSnapshot().getFile().getPath();
-    // Since the snapshot threshold is set to 1, since there are
-    // applyTransactions, we should see snapshots
+    stateMachine.takeSnapshot();
     Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
     FileInfo snapshot = storage.findLatestSnapshot().getFile();
     Assert.assertNotNull(snapshot);
     long containerID = omKeyLocationInfo.getContainerID();
     Pipeline pipeline = cluster.getStorageContainerLocationClient()
-        .getContainerWithPipeline(containerID).getPipeline();
+            .getContainerWithPipeline(containerID).getPipeline();
     XceiverClientSpi xceiverClient =
-        xceiverClientManager.acquireClient(pipeline);
+            xceiverClientManager.acquireClient(pipeline);
     ContainerProtos.ContainerCommandRequestProto.Builder request =
-        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+            ContainerProtos.ContainerCommandRequestProto.newBuilder();
     request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
     request.setCmdType(ContainerProtos.Type.CloseContainer);
     request.setContainerID(containerID);
     request.setCloseContainer(
-        ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+            ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
     try {
       xceiverClient.sendCommand(request.build());
     } catch (IOException e) {
       Assert.fail("Exception should not be thrown");
     }
     Assert.assertTrue(
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet().getContainer(containerID)
-            .getContainerState()
-            == ContainerProtos.ContainerDataProto.State.CLOSED);
+            TestHelper.getDatanodeService(omKeyLocationInfo, cluster)
+                    .getDatanodeStateMachine()
+                    .getContainer().getContainerSet().getContainer(containerID)
+                    .getContainerState()
+                    == ContainerProtos.ContainerDataProto.State.CLOSED);
     Assert.assertTrue(stateMachine.isStateMachineHealthy());
     try {
       stateMachine.takeSnapshot();
@@ -440,32 +461,37 @@ public class TestContainerStateMachineFailures {
   // closed here.
   @Test
   public void testWriteStateMachineDataIdempotencyWithClosedContainer()
-      throws Exception {
+          throws Exception {
     OzoneOutputStream key =
-        objectStore.getVolume(volumeName).getBucket(bucketName)
-            .createKey("ratis-1", 1024, ReplicationType.RATIS,
-                ReplicationFactor.ONE, new HashMap<>());
+            objectStore.getVolume(volumeName).getBucket(bucketName)
+                    .createKey("ratis-1", 1024, ReplicationType.RATIS,
+                            ReplicationFactor.ONE, new HashMap<>());
     // First write and flush creates a container in the datanode
     key.write("ratis".getBytes());
     key.flush();
     key.write("ratis".getBytes());
-    KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream groupOutputStream = (KeyOutputStream) key
+            .getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
-        groupOutputStream.getLocationInfoList();
+            groupOutputStream.getLocationInfoList();
     Assert.assertEquals(1, locationInfoList.size());
     OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+            cluster);
     ContainerData containerData =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet()
-            .getContainer(omKeyLocationInfo.getContainerID())
-            .getContainerData();
+            dn.getDatanodeStateMachine()
+                    .getContainer().getContainerSet()
+                    .getContainer(omKeyLocationInfo.getContainerID())
+                    .getContainerData();
     Assert.assertTrue(containerData instanceof KeyValueContainerData);
     key.close();
     ContainerStateMachine stateMachine =
-        (ContainerStateMachine) TestHelper.getStateMachine(cluster);
+            (ContainerStateMachine) TestHelper.getStateMachine(dn,
+                    omKeyLocationInfo.getPipeline());
     SimpleStateMachineStorage storage =
-        (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+            (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
     Path parentPath = storage.findLatestSnapshot().getFile().getPath();
+    stateMachine.takeSnapshot();
     // Since the snapshot threshold is set to 1, since there are
     // applyTransactions, we should see snapshots
     Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
@@ -473,21 +499,22 @@ public class TestContainerStateMachineFailures {
     Assert.assertNotNull(snapshot);
     long containerID = omKeyLocationInfo.getContainerID();
     Pipeline pipeline = cluster.getStorageContainerLocationClient()
-        .getContainerWithPipeline(containerID).getPipeline();
+            .getContainerWithPipeline(containerID).getPipeline();
     XceiverClientSpi xceiverClient =
-        xceiverClientManager.acquireClient(pipeline);
+            xceiverClientManager.acquireClient(pipeline);
     CountDownLatch latch = new CountDownLatch(100);
     int count = 0;
     AtomicInteger failCount = new AtomicInteger(0);
     Runnable r1 = () -> {
       try {
         ContainerProtos.ContainerCommandRequestProto.Builder request =
-            ContainerProtos.ContainerCommandRequestProto.newBuilder();
+                ContainerProtos.ContainerCommandRequestProto.newBuilder();
         request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
         request.setCmdType(ContainerProtos.Type.CloseContainer);
         request.setContainerID(containerID);
         request.setCloseContainer(
-            ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+                ContainerProtos.CloseContainerRequestProto.
+                        getDefaultInstance());
         xceiverClient.sendCommand(request.build());
       } catch (IOException e) {
         failCount.incrementAndGet();
@@ -496,13 +523,13 @@ public class TestContainerStateMachineFailures {
     Runnable r2 = () -> {
       try {
         xceiverClient.sendCommand(ContainerTestHelper
-            .getWriteChunkRequest(pipeline, omKeyLocationInfo.getBlockID(),
-                1024, new Random().nextInt(), null));
+                .getWriteChunkRequest(pipeline, omKeyLocationInfo.getBlockID(),
+                        1024, new Random().nextInt(), null));
         latch.countDown();
       } catch (IOException e) {
         latch.countDown();
         if (!(HddsClientUtils
-            .checkForException(e) instanceof ContainerNotOpenException)) {
+                .checkForException(e) instanceof ContainerNotOpenException)) {
           failCount.incrementAndGet();
         }
       }
@@ -529,10 +556,11 @@ public class TestContainerStateMachineFailures {
       fail("testWriteStateMachineDataIdempotencyWithClosedContainer failed");
     }
     Assert.assertTrue(
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet().getContainer(containerID)
-            .getContainerState()
-            == ContainerProtos.ContainerDataProto.State.CLOSED);
+            TestHelper.getDatanodeService(omKeyLocationInfo, cluster)
+                    .getDatanodeStateMachine()
+                    .getContainer().getContainerSet().getContainer(containerID)
+                    .getContainerState()
+                    == ContainerProtos.ContainerDataProto.State.CLOSED);
     Assert.assertTrue(stateMachine.isStateMachineHealthy());
     try {
       stateMachine.takeSnapshot();
@@ -543,89 +571,4 @@ public class TestContainerStateMachineFailures {
     Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath()));
 
   }
-
-  @Test
-  public void testValidateBCSIDOnDnRestart() throws Exception {
-    OzoneOutputStream key =
-        objectStore.getVolume(volumeName).getBucket(bucketName)
-            .createKey("ratis", 1024, ReplicationType.RATIS,
-                ReplicationFactor.ONE, new HashMap<>());
-    // First write and flush creates a container in the datanode
-    key.write("ratis".getBytes());
-    key.flush();
-    key.write("ratis".getBytes());
-    KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
-    List<OmKeyLocationInfo> locationInfoList =
-        groupOutputStream.getLocationInfoList();
-    Assert.assertEquals(1, locationInfoList.size());
-    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
-    ContainerData containerData =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet()
-            .getContainer(omKeyLocationInfo.getContainerID())
-            .getContainerData();
-    Assert.assertTrue(containerData instanceof KeyValueContainerData);
-    KeyValueContainerData keyValueContainerData =
-        (KeyValueContainerData) containerData;
-    key.close();
-
-    long containerID = omKeyLocationInfo.getContainerID();
-    cluster.shutdownHddsDatanode(
-        cluster.getHddsDatanodes().get(0).getDatanodeDetails());
-    // delete the container db file
-    FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
-    cluster.restartHddsDatanode(
-        cluster.getHddsDatanodes().get(0).getDatanodeDetails(), true);
-    OzoneContainer ozoneContainer =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer();
-    // make sure the missing containerSet is not empty
-    HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
-    Assert.assertTrue(!dispatcher.getMissingContainerSet().isEmpty());
-    Assert
-        .assertTrue(dispatcher.getMissingContainerSet().contains(containerID));
-
-    // write a new key
-    key = objectStore.getVolume(volumeName).getBucket(bucketName)
-        .createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE,
-            new HashMap<>());
-    // First write and flush creates a container in the datanode
-    key.write("ratis1".getBytes());
-    key.flush();
-    groupOutputStream = (KeyOutputStream) key.getOutputStream();
-    locationInfoList = groupOutputStream.getLocationInfoList();
-    Assert.assertEquals(1, locationInfoList.size());
-    omKeyLocationInfo = locationInfoList.get(0);
-    key.close();
-    containerID = omKeyLocationInfo.getContainerID();
-    containerData = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-        .getContainer().getContainerSet()
-        .getContainer(omKeyLocationInfo.getContainerID()).getContainerData();
-    Assert.assertTrue(containerData instanceof KeyValueContainerData);
-    keyValueContainerData = (KeyValueContainerData) containerData;
-    ReferenceCountedDB db = BlockUtils.
-        getDB(keyValueContainerData, conf);
-    byte[] blockCommitSequenceIdKey =
-        StringUtils.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
-
-    // modify the bcsid for the container in the ROCKS DB thereby inducing
-    // corruption
-    db.getStore().put(blockCommitSequenceIdKey, Longs.toByteArray(0));
-    db.decrementReference();
-    // shutdown of dn will take a snapsot which will persist the valid BCSID
-    // recorded in the container2BCSIDMap in ContainerStateMachine
-    cluster.shutdownHddsDatanode(
-        cluster.getHddsDatanodes().get(0).getDatanodeDetails());
-    // after the restart, there will be a mismatch in BCSID of what is recorded
-    // in the and what is there in RockSDB and hence the container would be
-    // marked unhealthy
-    cluster.restartHddsDatanode(
-        cluster.getHddsDatanodes().get(0).getDatanodeDetails(), true);
-    // Make sure the container is marked unhealthy
-    Assert.assertTrue(
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer().getContainerSet().getContainer(containerID)
-            .getContainerState()
-            == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
-  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java
new file mode 100644
index 0000000..4ac9aa5
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.
+        HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.
+        HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+        .HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
+        OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
+        OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+
+/**
+ * Tests the containerStateMachine failure handling.
+ */
+
+public class TestValidateBCSIDOnRestart {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static String volumeName;
+  private static String bucketName;
+  private static XceiverClientManager xceiverClientManager;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.setBoolean(OzoneConfigKeys.
+            OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+            TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
+            TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
+            TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 10, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1,
+            TimeUnit.SECONDS);
+
+    conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
+            + ".client.request.write.timeout", 10, TimeUnit.SECONDS);
+    conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
+            + ".client.request.watch.timeout", 10, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+                    DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY,
+            3, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+                    DatanodeRatisServerConfig.
+                            RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY,
+            10, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY + "." +
+                    "rpc.request.timeout",
+            3, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY + "." +
+                    "watch.request.timeout",
+            10, TimeUnit.SECONDS);
+    conf.setQuietMode(false);
+    cluster =
+            MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2).
+                    setHbInterval(200)
+                    .build();
+    cluster.waitForClusterToBeReady();
+    cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getRpcClient(conf);
+    objectStore = client.getObjectStore();
+    xceiverClientManager = new XceiverClientManager(conf);
+    volumeName = "testcontainerstatemachinefailures";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testValidateBCSIDOnDnRestart() throws Exception {
+    OzoneOutputStream key =
+            objectStore.getVolume(volumeName).getBucket(bucketName)
+                    .createKey("ratis", 1024, ReplicationType.RATIS,
+                            ReplicationFactor.ONE, new HashMap<>());
+    // First write and flush creates a container in the datanode
+    key.write("ratis".getBytes());
+    key.flush();
+    key.write("ratis".getBytes());
+    KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+            groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+            cluster);
+    ContainerData containerData =
+            TestHelper.getDatanodeService(omKeyLocationInfo, cluster)
+                    .getDatanodeStateMachine()
+                    .getContainer().getContainerSet()
+                    .getContainer(omKeyLocationInfo.getContainerID())
+                    .getContainerData();
+    Assert.assertTrue(containerData instanceof KeyValueContainerData);
+    KeyValueContainerData keyValueContainerData =
+            (KeyValueContainerData) containerData;
+    key.close();
+
+    long containerID = omKeyLocationInfo.getContainerID();
+    int index = cluster.getHddsDatanodeIndex(dn.getDatanodeDetails());
+    // delete the container db file
+    FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
+
+    HddsDatanodeService dnService = cluster.getHddsDatanodes().get(index);
+    OzoneContainer ozoneContainer =
+            dnService.getDatanodeStateMachine()
+                    .getContainer();
+    ozoneContainer.getContainerSet().removeContainer(containerID);
+    ContainerStateMachine stateMachine =
+            (ContainerStateMachine) TestHelper.getStateMachine(cluster.
+                    getHddsDatanodes().get(index),
+                    omKeyLocationInfo.getPipeline());
+    SimpleStateMachineStorage storage =
+            (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+    stateMachine.takeSnapshot();
+    Path parentPath = storage.findLatestSnapshot().getFile().getPath();
+    stateMachine.buildMissingContainerSet(parentPath.toFile());
+    // Since the snapshot threshold is set to 1, since there are
+    // applyTransactions, we should see snapshots
+    Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
+    FileInfo snapshot = storage.findLatestSnapshot().getFile();
+
+    // make sure the missing containerSet is not empty
+    HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
+    Assert.assertTrue(!dispatcher.getMissingContainerSet().isEmpty());
+    Assert
+            .assertTrue(dispatcher.getMissingContainerSet()
+                    .contains(containerID));
+    // write a new key
+    key = objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey("ratis", 1024, ReplicationType.RATIS,
+                    ReplicationFactor.ONE, new HashMap<>());
+    // First write and flush creates a container in the datanode
+    key.write("ratis1".getBytes());
+    key.flush();
+    groupOutputStream = (KeyOutputStream) key.getOutputStream();
+    locationInfoList = groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+    omKeyLocationInfo = locationInfoList.get(0);
+    key.close();
+    containerID = omKeyLocationInfo.getContainerID();
+    dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+            cluster);
+    containerData = dn.getDatanodeStateMachine()
+            .getContainer().getContainerSet()
+            .getContainer(omKeyLocationInfo.getContainerID())
+            .getContainerData();
+    Assert.assertTrue(containerData instanceof KeyValueContainerData);
+    keyValueContainerData = (KeyValueContainerData) containerData;
+    ReferenceCountedDB db = BlockUtils.
+            getDB(keyValueContainerData, conf);
+    byte[] blockCommitSequenceIdKey =
+            StringUtils.
+                    string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
+
+    // modify the bcsid for the container in the ROCKS DB thereby inducing
+    // corruption
+    db.getStore().put(blockCommitSequenceIdKey, Longs.toByteArray(0));
+    db.decrementReference();
+    // after the restart, there will be a mismatch in BCSID of what is recorded
+    // in the and what is there in RockSDB and hence the container would be
+    // marked unhealthy
+    index = cluster.getHddsDatanodeIndex(dn.getDatanodeDetails());
+    cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
+    // Make sure the container is marked unhealthy
+    Assert.assertTrue(
+            cluster.getHddsDatanodes().get(index)
+                    .getDatanodeStateMachine()
+                    .getContainer().getContainerSet().getContainer(containerID)
+                    .getContainerState()
+                    == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index f8646b9..12ffce6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.impl.RaftServerImpl;
@@ -315,4 +316,13 @@ public final class TestHelper {
     return getRaftServerImpl(dn, pipeline).getStateMachine();
   }
 
+  public static HddsDatanodeService getDatanodeService(OmKeyLocationInfo info,
+                                                 MiniOzoneCluster cluster)
+          throws IOException {
+    DatanodeDetails dnDetails =  info.getPipeline().
+            getFirstNode();
+    return cluster.getHddsDatanodes().get(cluster.
+            getHddsDatanodeIndex(dnDetails));
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org