You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2020/06/26 10:08:06 UTC
[hadoop-ozone] branch master updated: HDDS-3018. Fix
TestContainerStateMachineFailures.java (#556)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 025fc54 HDDS-3018. Fix TestContainerStateMachineFailures.java (#556)
025fc54 is described below
commit 025fc547f3c8092ba687311c25b322698804f14a
Author: bshashikant <sh...@apache.org>
AuthorDate: Fri Jun 26 15:37:51 2020 +0530
HDDS-3018. Fix TestContainerStateMachineFailures.java (#556)
---
.../server/ratis/ContainerStateMachine.java | 14 +-
.../rpc/TestContainerStateMachineFailures.java | 419 +++++++++------------
.../client/rpc/TestValidateBCSIDOnRestart.java | 258 +++++++++++++
.../apache/hadoop/ozone/container/TestHelper.java | 10 +
4 files changed, 459 insertions(+), 242 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 96675c8..859cd20 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -237,19 +237,25 @@ public class ContainerStateMachine extends BaseStateMachine {
// initialize the dispatcher with snapshot so that it build the missing
// container list
+ buildMissingContainerSet(snapshotFile);
+ return last.getIndex();
+ }
+
+ @VisibleForTesting
+ public void buildMissingContainerSet(File snapshotFile) throws IOException {
+ // initialize the dispatcher with snapshot so that it build the missing
+ // container list
try (FileInputStream fin = new FileInputStream(snapshotFile)) {
ContainerProtos.Container2BCSIDMapProto proto =
- ContainerProtos.Container2BCSIDMapProto
- .parseFrom(fin);
+ ContainerProtos.Container2BCSIDMapProto
+ .parseFrom(fin);
// read the created containers list from the snapshot file and add it to
// the container2BCSIDMap here.
// container2BCSIDMap will further grow as and when containers get created
container2BCSIDMap.putAll(proto.getContainer2BCSIDMap());
dispatcher.buildMissingContainerSetAndValidate(container2BCSIDMap);
}
- return last.getIndex();
}
-
/**
* As a part of taking snapshot with Ratis StateMachine, it will persist
* the existing container set in the snapshotFile.
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 17875e9..1e44562 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -17,19 +17,20 @@
package org.apache.hadoop.ozone.client.rpc;
-import com.google.common.primitives.Longs;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -43,22 +44,18 @@ import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
-import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.
+ ContainerStateMachine;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -68,6 +65,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -81,8 +79,6 @@ import static org.apache.hadoop.hdds.HddsConfigKeys
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
ContainerDataProto.State.UNHEALTHY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
- HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
@@ -94,7 +90,6 @@ import static org.junit.Assert.fail;
* Tests the containerStateMachine failure handling.
*/
-@Ignore
public class TestContainerStateMachineFailures {
private static MiniOzoneCluster cluster;
@@ -103,7 +98,6 @@ public class TestContainerStateMachineFailures {
private static ObjectStore objectStore;
private static String volumeName;
private static String bucketName;
- private static String path;
private static XceiverClientManager xceiverClientManager;
/**
@@ -114,32 +108,46 @@ public class TestContainerStateMachineFailures {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
- path = GenericTestUtils
- .getTempPath(TestContainerStateMachineFailures.class.getSimpleName());
- File baseDir = new File(path);
- baseDir.mkdirs();
-
-
+ conf.setBoolean(OzoneConfigKeys.
+ OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
- conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
- conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
- conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1,
TimeUnit.SECONDS);
+
conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
- + ".client.request.write.timeout", 30, TimeUnit.SECONDS);
+ + ".client.request.write.timeout", 10, TimeUnit.SECONDS);
conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
- + ".client.request.watch.timeout", 30, TimeUnit.SECONDS);
+ + ".client.request.watch.timeout", 10, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+ DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY,
+ 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+ DatanodeRatisServerConfig.
+ RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY,
+ 10, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
+ "rpc.request.timeout",
+ 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
+ "watch.request.timeout",
+ 10, TimeUnit.SECONDS);
conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
conf.setQuietMode(false);
cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10).setHbInterval(200)
.build();
cluster.waitForClusterToBeReady();
+ cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getRpcClient(conf);
objectStore = client.getObjectStore();
@@ -163,80 +171,85 @@ public class TestContainerStateMachineFailures {
@Test
public void testContainerStateMachineFailures() throws Exception {
OzoneOutputStream key =
- objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey("ratis", 1024, ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>());
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis", 1024, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
byte[] testData = "ratis".getBytes();
// First write and flush creates a container in the datanode
key.write(testData);
key.flush();
key.write(testData);
KeyOutputStream groupOutputStream =
- (KeyOutputStream) key.getOutputStream();
+ (KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
- groupOutputStream.getLocationInfoList();
+ groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+ cluster);
// delete the container dir
- FileUtil.fullyDelete(new File(
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet()
- .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
- .getContainerPath()));
+ FileUtil.fullyDelete(new File(dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID()).
+ getContainerData().getContainerPath()));
try {
// there is only 1 datanode in the pipeline, the pipeline will be closed
// and allocation to new pipeline will fail as there is no other dn in
// the cluster
key.close();
- } catch(IOException ioe) {
- Assert.assertTrue(ioe instanceof OMException);
+ } catch (IOException ioe) {
}
long containerID = omKeyLocationInfo.getContainerID();
// Make sure the container is marked unhealthy
Assert.assertTrue(
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet()
- .getContainer(containerID)
- .getContainerState()
- == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
- OzoneContainer ozoneContainer = cluster.getHddsDatanodes().get(0)
- .getDatanodeStateMachine().getContainer();
- // make sure the missing containerSet is empty
- HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
- Assert.assertTrue(dispatcher.getMissingContainerSet().isEmpty());
+ dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(containerID)
+ .getContainerState()
+ == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+ OzoneContainer ozoneContainer;
// restart the hdds datanode, container should not in the regular set
- cluster.restartHddsDatanode(0, true);
- ozoneContainer = cluster.getHddsDatanodes().get(0)
- .getDatanodeStateMachine().getContainer();
- Assert
- .assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
+ OzoneConfiguration config = dn.getConf();
+ final String dir = config.get(OzoneConfigKeys.
+ DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR)
+ + UUID.randomUUID();
+ config.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
+ int index = cluster.getHddsDatanodeIndex(dn.getDatanodeDetails());
+ cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
+ ozoneContainer = cluster.getHddsDatanodes().get(index)
+ .getDatanodeStateMachine().getContainer();
+ Assert.assertNull(ozoneContainer.getContainerSet().
+ getContainer(containerID));
}
@Test
public void testUnhealthyContainer() throws Exception {
OzoneOutputStream key =
- objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey("ratis", 1024, ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>());
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis", 1024, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
// First write and flush creates a container in the datanode
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());
- KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+ KeyOutputStream groupOutputStream = (KeyOutputStream) key
+ .getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
- groupOutputStream.getLocationInfoList();
+ groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+ cluster);
ContainerData containerData =
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet()
- .getContainer(omKeyLocationInfo.getContainerID())
- .getContainerData();
+ dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID())
+ .getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
KeyValueContainerData keyValueContainerData =
- (KeyValueContainerData) containerData;
+ (KeyValueContainerData) containerData;
// delete the container db file
FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
try {
@@ -244,78 +257,90 @@ public class TestContainerStateMachineFailures {
// and allocation to new pipeline will fail as there is no other dn in
// the cluster
key.close();
- } catch(IOException ioe) {
- Assert.assertTrue(ioe instanceof OMException);
+ } catch (IOException ioe) {
}
long containerID = omKeyLocationInfo.getContainerID();
// Make sure the container is marked unhealthy
Assert.assertTrue(
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet().getContainer(containerID)
- .getContainerState()
- == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+ dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet().getContainer(containerID)
+ .getContainerState()
+ == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
// Check metadata in the .container file
File containerFile = new File(keyValueContainerData.getMetadataPath(),
- containerID + OzoneConsts.CONTAINER_EXTENSION);
+ containerID + OzoneConsts.CONTAINER_EXTENSION);
keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
- .readContainerFile(containerFile);
+ .readContainerFile(containerFile);
assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+ OzoneConfiguration config = dn.getConf();
+ final String dir = config.get(OzoneConfigKeys.
+ DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR)
+ + UUID.randomUUID();
+ config.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
+ int index = cluster.getHddsDatanodeIndex(dn.getDatanodeDetails());
// restart the hdds datanode and see if the container is listed in the
// in the missing container set and not in the regular set
- cluster.restartHddsDatanode(0, true);
+ cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
// make sure the container state is still marked unhealthy after restart
keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
- .readContainerFile(containerFile);
+ .readContainerFile(containerFile);
assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
OzoneContainer ozoneContainer;
- ozoneContainer = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer();
- HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
+ HddsDatanodeService dnService = cluster.getHddsDatanodes().get(index);
+ ozoneContainer = dnService
+ .getDatanodeStateMachine().getContainer();
+ HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer
+ .getDispatcher();
ContainerProtos.ContainerCommandRequestProto.Builder request =
- ContainerProtos.ContainerCommandRequestProto.newBuilder();
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(
- ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
- request.setDatanodeUuid(
- cluster.getHddsDatanodes().get(0).getDatanodeDetails().getUuidString());
+ ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+ request.setDatanodeUuid(dnService.getDatanodeDetails().getUuidString());
Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY,
- dispatcher.dispatch(request.build(), null).getResult());
+ dispatcher.dispatch(request.build(), null)
+ .getResult());
}
@Test
public void testApplyTransactionFailure() throws Exception {
OzoneOutputStream key =
- objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey("ratis", 1024, ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>());
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis", 1024, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
// First write and flush creates a container in the datanode
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());
- KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+ KeyOutputStream groupOutputStream = (KeyOutputStream) key.
+ getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
- groupOutputStream.getLocationInfoList();
+ groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
- ContainerData containerData =
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet()
- .getContainer(omKeyLocationInfo.getContainerID())
- .getContainerData();
+ HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+ cluster);
+ int index = cluster.getHddsDatanodeIndex(dn.getDatanodeDetails());
+ ContainerData containerData = dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID())
+ .getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
KeyValueContainerData keyValueContainerData =
- (KeyValueContainerData) containerData;
+ (KeyValueContainerData) containerData;
key.close();
ContainerStateMachine stateMachine =
- (ContainerStateMachine) TestHelper.getStateMachine(cluster);
+ (ContainerStateMachine) TestHelper.getStateMachine(cluster.
+ getHddsDatanodes().get(index), omKeyLocationInfo.getPipeline());
SimpleStateMachineStorage storage =
- (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+ (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+ stateMachine.takeSnapshot();
Path parentPath = storage.findLatestSnapshot().getFile().getPath();
// Since the snapshot threshold is set to 1, since there are
// applyTransactions, we should see snapshots
@@ -326,36 +351,30 @@ public class TestContainerStateMachineFailures {
// delete the container db file
FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
Pipeline pipeline = cluster.getStorageContainerLocationClient()
- .getContainerWithPipeline(containerID).getPipeline();
+ .getContainerWithPipeline(containerID).getPipeline();
XceiverClientSpi xceiverClient =
- xceiverClientManager.acquireClient(pipeline);
+ xceiverClientManager.acquireClient(pipeline);
ContainerProtos.ContainerCommandRequestProto.Builder request =
- ContainerProtos.ContainerCommandRequestProto.newBuilder();
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(
- ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+ ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
// close container transaction will fail over Ratis and will initiate
// a pipeline close action
- // Since the applyTransaction failure is propagated to Ratis,
- // stateMachineUpdater will it exception while taking the next snapshot
- // and should shutdown the RaftServerImpl. The client request will fail
- // with RaftRetryFailureException.
try {
xceiverClient.sendCommand(request.build());
Assert.fail("Expected exception not thrown");
} catch (IOException e) {
- Assert.assertTrue(HddsClientUtils
- .checkForException(e) instanceof RaftRetryFailureException);
+ // Exception should be thrown
}
// Make sure the container is marked unhealthy
- Assert.assertTrue(
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet().getContainer(containerID)
- .getContainerState()
- == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+ Assert.assertTrue(dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet().getContainer(containerID)
+ .getContainerState()
+ == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
try {
// try to take a new snapshot, ideally it should just fail
stateMachine.takeSnapshot();
@@ -369,59 +388,61 @@ public class TestContainerStateMachineFailures {
@Test
public void testApplyTransactionIdempotencyWithClosedContainer()
- throws Exception {
+ throws Exception {
OzoneOutputStream key =
- objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey("ratis", 1024, ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>());
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis", 1024, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
// First write and flush creates a container in the datanode
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
- groupOutputStream.getLocationInfoList();
+ groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
- ContainerData containerData =
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet()
- .getContainer(omKeyLocationInfo.getContainerID())
- .getContainerData();
+ HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+ cluster);
+ ContainerData containerData = dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID())
+ .getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
key.close();
ContainerStateMachine stateMachine =
- (ContainerStateMachine) TestHelper.getStateMachine(cluster);
+ (ContainerStateMachine) TestHelper.getStateMachine(dn,
+ omKeyLocationInfo.getPipeline());
SimpleStateMachineStorage storage =
- (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+ (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
Path parentPath = storage.findLatestSnapshot().getFile().getPath();
- // Since the snapshot threshold is set to 1, since there are
- // applyTransactions, we should see snapshots
+ stateMachine.takeSnapshot();
Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
FileInfo snapshot = storage.findLatestSnapshot().getFile();
Assert.assertNotNull(snapshot);
long containerID = omKeyLocationInfo.getContainerID();
Pipeline pipeline = cluster.getStorageContainerLocationClient()
- .getContainerWithPipeline(containerID).getPipeline();
+ .getContainerWithPipeline(containerID).getPipeline();
XceiverClientSpi xceiverClient =
- xceiverClientManager.acquireClient(pipeline);
+ xceiverClientManager.acquireClient(pipeline);
ContainerProtos.ContainerCommandRequestProto.Builder request =
- ContainerProtos.ContainerCommandRequestProto.newBuilder();
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(
- ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+ ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
try {
xceiverClient.sendCommand(request.build());
} catch (IOException e) {
Assert.fail("Exception should not be thrown");
}
Assert.assertTrue(
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet().getContainer(containerID)
- .getContainerState()
- == ContainerProtos.ContainerDataProto.State.CLOSED);
+ TestHelper.getDatanodeService(omKeyLocationInfo, cluster)
+ .getDatanodeStateMachine()
+ .getContainer().getContainerSet().getContainer(containerID)
+ .getContainerState()
+ == ContainerProtos.ContainerDataProto.State.CLOSED);
Assert.assertTrue(stateMachine.isStateMachineHealthy());
try {
stateMachine.takeSnapshot();
@@ -440,32 +461,37 @@ public class TestContainerStateMachineFailures {
// closed here.
@Test
public void testWriteStateMachineDataIdempotencyWithClosedContainer()
- throws Exception {
+ throws Exception {
OzoneOutputStream key =
- objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey("ratis-1", 1024, ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>());
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis-1", 1024, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
// First write and flush creates a container in the datanode
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());
- KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+ KeyOutputStream groupOutputStream = (KeyOutputStream) key
+ .getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
- groupOutputStream.getLocationInfoList();
+ groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+ cluster);
ContainerData containerData =
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet()
- .getContainer(omKeyLocationInfo.getContainerID())
- .getContainerData();
+ dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID())
+ .getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
key.close();
ContainerStateMachine stateMachine =
- (ContainerStateMachine) TestHelper.getStateMachine(cluster);
+ (ContainerStateMachine) TestHelper.getStateMachine(dn,
+ omKeyLocationInfo.getPipeline());
SimpleStateMachineStorage storage =
- (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+ (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
Path parentPath = storage.findLatestSnapshot().getFile().getPath();
+ stateMachine.takeSnapshot();
// Since the snapshot threshold is set to 1, since there are
// applyTransactions, we should see snapshots
Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
@@ -473,21 +499,22 @@ public class TestContainerStateMachineFailures {
Assert.assertNotNull(snapshot);
long containerID = omKeyLocationInfo.getContainerID();
Pipeline pipeline = cluster.getStorageContainerLocationClient()
- .getContainerWithPipeline(containerID).getPipeline();
+ .getContainerWithPipeline(containerID).getPipeline();
XceiverClientSpi xceiverClient =
- xceiverClientManager.acquireClient(pipeline);
+ xceiverClientManager.acquireClient(pipeline);
CountDownLatch latch = new CountDownLatch(100);
int count = 0;
AtomicInteger failCount = new AtomicInteger(0);
Runnable r1 = () -> {
try {
ContainerProtos.ContainerCommandRequestProto.Builder request =
- ContainerProtos.ContainerCommandRequestProto.newBuilder();
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(
- ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+ ContainerProtos.CloseContainerRequestProto.
+ getDefaultInstance());
xceiverClient.sendCommand(request.build());
} catch (IOException e) {
failCount.incrementAndGet();
@@ -496,13 +523,13 @@ public class TestContainerStateMachineFailures {
Runnable r2 = () -> {
try {
xceiverClient.sendCommand(ContainerTestHelper
- .getWriteChunkRequest(pipeline, omKeyLocationInfo.getBlockID(),
- 1024, new Random().nextInt(), null));
+ .getWriteChunkRequest(pipeline, omKeyLocationInfo.getBlockID(),
+ 1024, new Random().nextInt(), null));
latch.countDown();
} catch (IOException e) {
latch.countDown();
if (!(HddsClientUtils
- .checkForException(e) instanceof ContainerNotOpenException)) {
+ .checkForException(e) instanceof ContainerNotOpenException)) {
failCount.incrementAndGet();
}
}
@@ -529,10 +556,11 @@ public class TestContainerStateMachineFailures {
fail("testWriteStateMachineDataIdempotencyWithClosedContainer failed");
}
Assert.assertTrue(
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet().getContainer(containerID)
- .getContainerState()
- == ContainerProtos.ContainerDataProto.State.CLOSED);
+ TestHelper.getDatanodeService(omKeyLocationInfo, cluster)
+ .getDatanodeStateMachine()
+ .getContainer().getContainerSet().getContainer(containerID)
+ .getContainerState()
+ == ContainerProtos.ContainerDataProto.State.CLOSED);
Assert.assertTrue(stateMachine.isStateMachineHealthy());
try {
stateMachine.takeSnapshot();
@@ -543,89 +571,4 @@ public class TestContainerStateMachineFailures {
Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath()));
}
-
- @Test
- public void testValidateBCSIDOnDnRestart() throws Exception {
- OzoneOutputStream key =
- objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey("ratis", 1024, ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>());
- // First write and flush creates a container in the datanode
- key.write("ratis".getBytes());
- key.flush();
- key.write("ratis".getBytes());
- KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
- List<OmKeyLocationInfo> locationInfoList =
- groupOutputStream.getLocationInfoList();
- Assert.assertEquals(1, locationInfoList.size());
- OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
- ContainerData containerData =
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet()
- .getContainer(omKeyLocationInfo.getContainerID())
- .getContainerData();
- Assert.assertTrue(containerData instanceof KeyValueContainerData);
- KeyValueContainerData keyValueContainerData =
- (KeyValueContainerData) containerData;
- key.close();
-
- long containerID = omKeyLocationInfo.getContainerID();
- cluster.shutdownHddsDatanode(
- cluster.getHddsDatanodes().get(0).getDatanodeDetails());
- // delete the container db file
- FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
- cluster.restartHddsDatanode(
- cluster.getHddsDatanodes().get(0).getDatanodeDetails(), true);
- OzoneContainer ozoneContainer =
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer();
- // make sure the missing containerSet is not empty
- HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
- Assert.assertTrue(!dispatcher.getMissingContainerSet().isEmpty());
- Assert
- .assertTrue(dispatcher.getMissingContainerSet().contains(containerID));
-
- // write a new key
- key = objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE,
- new HashMap<>());
- // First write and flush creates a container in the datanode
- key.write("ratis1".getBytes());
- key.flush();
- groupOutputStream = (KeyOutputStream) key.getOutputStream();
- locationInfoList = groupOutputStream.getLocationInfoList();
- Assert.assertEquals(1, locationInfoList.size());
- omKeyLocationInfo = locationInfoList.get(0);
- key.close();
- containerID = omKeyLocationInfo.getContainerID();
- containerData = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet()
- .getContainer(omKeyLocationInfo.getContainerID()).getContainerData();
- Assert.assertTrue(containerData instanceof KeyValueContainerData);
- keyValueContainerData = (KeyValueContainerData) containerData;
- ReferenceCountedDB db = BlockUtils.
- getDB(keyValueContainerData, conf);
- byte[] blockCommitSequenceIdKey =
- StringUtils.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
-
- // modify the bcsid for the container in the ROCKS DB thereby inducing
- // corruption
- db.getStore().put(blockCommitSequenceIdKey, Longs.toByteArray(0));
- db.decrementReference();
- // shutdown of dn will take a snapsot which will persist the valid BCSID
- // recorded in the container2BCSIDMap in ContainerStateMachine
- cluster.shutdownHddsDatanode(
- cluster.getHddsDatanodes().get(0).getDatanodeDetails());
- // after the restart, there will be a mismatch in BCSID of what is recorded
- // in the and what is there in RockSDB and hence the container would be
- // marked unhealthy
- cluster.restartHddsDatanode(
- cluster.getHddsDatanodes().get(0).getDatanodeDetails(), true);
- // Make sure the container is marked unhealthy
- Assert.assertTrue(
- cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
- .getContainer().getContainerSet().getContainer(containerID)
- .getContainerState()
- == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
- }
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java
new file mode 100644
index 0000000..4ac9aa5
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.
+ HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.
+ HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
+ OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
+ OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+
+/**
+ * Tests the containerStateMachine failure handling.
+ */
+
+public class TestValidateBCSIDOnRestart {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+ private static OzoneClient client;
+ private static ObjectStore objectStore;
+ private static String volumeName;
+ private static String bucketName;
+ private static XceiverClientManager xceiverClientManager;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ conf = new OzoneConfiguration();
+ conf.setBoolean(OzoneConfigKeys.
+ OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
+ conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 10, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1,
+ TimeUnit.SECONDS);
+
+ conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
+ + ".client.request.write.timeout", 10, TimeUnit.SECONDS);
+ conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY
+ + ".client.request.watch.timeout", 10, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+ DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY,
+ 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+ DatanodeRatisServerConfig.
+ RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY,
+ 10, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY + "." +
+ "rpc.request.timeout",
+ 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY + "." +
+ "watch.request.timeout",
+ 10, TimeUnit.SECONDS);
+ conf.setQuietMode(false);
+ cluster =
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2).
+ setHbInterval(200)
+ .build();
+ cluster.waitForClusterToBeReady();
+ cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
+ //the easiest way to create an open container is creating a key
+ client = OzoneClientFactory.getRpcClient(conf);
+ objectStore = client.getObjectStore();
+ xceiverClientManager = new XceiverClientManager(conf);
+ volumeName = "testcontainerstatemachinefailures";
+ bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testValidateBCSIDOnDnRestart() throws Exception {
+ OzoneOutputStream key =
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis", 1024, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
+ // First write and flush creates a container in the datanode
+ key.write("ratis".getBytes());
+ key.flush();
+ key.write("ratis".getBytes());
+ KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+ List<OmKeyLocationInfo> locationInfoList =
+ groupOutputStream.getLocationInfoList();
+ Assert.assertEquals(1, locationInfoList.size());
+ OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+ cluster);
+ ContainerData containerData =
+ TestHelper.getDatanodeService(omKeyLocationInfo, cluster)
+ .getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID())
+ .getContainerData();
+ Assert.assertTrue(containerData instanceof KeyValueContainerData);
+ KeyValueContainerData keyValueContainerData =
+ (KeyValueContainerData) containerData;
+ key.close();
+
+ long containerID = omKeyLocationInfo.getContainerID();
+ int index = cluster.getHddsDatanodeIndex(dn.getDatanodeDetails());
+ // delete the container db file
+ FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
+
+ HddsDatanodeService dnService = cluster.getHddsDatanodes().get(index);
+ OzoneContainer ozoneContainer =
+ dnService.getDatanodeStateMachine()
+ .getContainer();
+ ozoneContainer.getContainerSet().removeContainer(containerID);
+ ContainerStateMachine stateMachine =
+ (ContainerStateMachine) TestHelper.getStateMachine(cluster.
+ getHddsDatanodes().get(index),
+ omKeyLocationInfo.getPipeline());
+ SimpleStateMachineStorage storage =
+ (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+ stateMachine.takeSnapshot();
+ Path parentPath = storage.findLatestSnapshot().getFile().getPath();
+ stateMachine.buildMissingContainerSet(parentPath.toFile());
+ // Since the snapshot threshold is set to 1, since there are
+ // applyTransactions, we should see snapshots
+ Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
+ FileInfo snapshot = storage.findLatestSnapshot().getFile();
+
+ // make sure the missing containerSet is not empty
+ HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
+ Assert.assertTrue(!dispatcher.getMissingContainerSet().isEmpty());
+ Assert
+ .assertTrue(dispatcher.getMissingContainerSet()
+ .contains(containerID));
+ // write a new key
+ key = objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis", 1024, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
+ // First write and flush creates a container in the datanode
+ key.write("ratis1".getBytes());
+ key.flush();
+ groupOutputStream = (KeyOutputStream) key.getOutputStream();
+ locationInfoList = groupOutputStream.getLocationInfoList();
+ Assert.assertEquals(1, locationInfoList.size());
+ omKeyLocationInfo = locationInfoList.get(0);
+ key.close();
+ containerID = omKeyLocationInfo.getContainerID();
+ dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+ cluster);
+ containerData = dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID())
+ .getContainerData();
+ Assert.assertTrue(containerData instanceof KeyValueContainerData);
+ keyValueContainerData = (KeyValueContainerData) containerData;
+ ReferenceCountedDB db = BlockUtils.
+ getDB(keyValueContainerData, conf);
+ byte[] blockCommitSequenceIdKey =
+ StringUtils.
+ string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
+
+ // modify the bcsid for the container in the ROCKS DB thereby inducing
+ // corruption
+ db.getStore().put(blockCommitSequenceIdKey, Longs.toByteArray(0));
+ db.decrementReference();
+ // after the restart, there will be a mismatch in BCSID of what is recorded
+ // in the and what is there in RockSDB and hence the container would be
+ // marked unhealthy
+ index = cluster.getHddsDatanodeIndex(dn.getDatanodeDetails());
+ cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
+ // Make sure the container is marked unhealthy
+ Assert.assertTrue(
+ cluster.getHddsDatanodes().get(index)
+ .getDatanodeStateMachine()
+ .getContainer().getContainerSet().getContainer(containerID)
+ .getContainerState()
+ == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index f8646b9..12ffce6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.impl.RaftServerImpl;
@@ -315,4 +316,13 @@ public final class TestHelper {
return getRaftServerImpl(dn, pipeline).getStateMachine();
}
+ public static HddsDatanodeService getDatanodeService(OmKeyLocationInfo info,
+ MiniOzoneCluster cluster)
+ throws IOException {
+ DatanodeDetails dnDetails = info.getPipeline().
+ getFirstNode();
+ return cluster.getHddsDatanodes().get(cluster.
+ getHddsDatanodeIndex(dnDetails));
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org