You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/10/02 04:41:19 UTC
[hadoop] branch trunk updated: HDDS-2210. ContainerStateMachine
should not be marked unhealthy if applyTransaction fails with closed
container exception(#1552).
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 41440ec HDDS-2210. ContainerStateMachine should not be marked unhealthy if applyTransaction fails with closed container exception(#1552).
41440ec is described below
commit 41440ec890348f95bf7f10b5ced737e41dd6c3d3
Author: bshashikant <sh...@apache.org>
AuthorDate: Wed Oct 2 10:11:01 2019 +0530
HDDS-2210. ContainerStateMachine should not be marked unhealthy if applyTransaction fails with closed container exception(#1552).
---
.../server/ratis/ContainerStateMachine.java | 20 +++++--
.../rpc/TestContainerStateMachineFailures.java | 65 ++++++++++++++++++++++
2 files changed, 79 insertions(+), 6 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 0535763..7b638a3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -150,7 +150,7 @@ public class ContainerStateMachine extends BaseStateMachine {
private final Cache<Long, ByteString> stateMachineDataCache;
private final boolean isBlockTokenEnabled;
private final TokenVerifier tokenVerifier;
- private final AtomicBoolean isStateMachineHealthy;
+ private final AtomicBoolean stateMachineHealthy;
private final Semaphore applyTransactionSemaphore;
/**
@@ -190,7 +190,7 @@ public class ContainerStateMachine extends BaseStateMachine {
ScmConfigKeys.
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
- isStateMachineHealthy = new AtomicBoolean(true);
+ stateMachineHealthy = new AtomicBoolean(true);
this.executors = new ExecutorService[numContainerOpExecutors];
for (int i = 0; i < numContainerOpExecutors; i++) {
final int index = i;
@@ -271,11 +271,15 @@ public class ContainerStateMachine extends BaseStateMachine {
IOUtils.write(builder.build().toByteArray(), out);
}
+ public boolean isStateMachineHealthy() {
+ return stateMachineHealthy.get();
+ }
+
@Override
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
long startTime = Time.monotonicNow();
- if (!isStateMachineHealthy.get()) {
+ if (!isStateMachineHealthy()) {
String msg =
"Failed to take snapshot " + " for " + gid + " as the stateMachine"
+ " is unhealthy. The last applied index is at " + ti;
@@ -731,7 +735,11 @@ public class ContainerStateMachine extends BaseStateMachine {
metrics.incPipelineLatency(cmdType,
Time.monotonicNowNanos() - startTime);
}
- if (r.getResult() != ContainerProtos.Result.SUCCESS) {
+ // ignore close container exception while marking the stateMachine
+ // unhealthy
+ if (r.getResult() != ContainerProtos.Result.SUCCESS
+ && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+ && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(
@@ -744,7 +752,7 @@ public class ContainerStateMachine extends BaseStateMachine {
// caught in stateMachineUpdater in Ratis and ratis server will
// shutdown.
applyTransactionFuture.completeExceptionally(sce);
- isStateMachineHealthy.compareAndSet(true, false);
+ stateMachineHealthy.compareAndSet(true, false);
ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
} else {
LOG.debug(
@@ -759,7 +767,7 @@ public class ContainerStateMachine extends BaseStateMachine {
// add the entry to the applyTransactionCompletionMap only if the
// stateMachine is healthy i.e, there has been no applyTransaction
// failures before.
- if (isStateMachineHealthy.get()) {
+ if (isStateMachineHealthy()) {
final Long previous = applyTransactionCompletionMap
.put(index, trx.getLogEntry().getTerm());
Preconditions.checkState(previous == null);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 7b90815..9ac45b8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -354,6 +354,71 @@ public class TestContainerStateMachineFailures {
}
@Test
+ public void testApplyTransactionIdempotencyWithClosedContainer()
+ throws Exception {
+ OzoneOutputStream key =
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis", 1024, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
+ // First write and flush creates a container in the datanode
+ key.write("ratis".getBytes());
+ key.flush();
+ key.write("ratis".getBytes());
+ KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+ List<OmKeyLocationInfo> locationInfoList =
+ groupOutputStream.getLocationInfoList();
+ Assert.assertEquals(1, locationInfoList.size());
+ OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ ContainerData containerData =
+ cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID())
+ .getContainerData();
+ Assert.assertTrue(containerData instanceof KeyValueContainerData);
+ key.close();
+ ContainerStateMachine stateMachine =
+ (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
+ SimpleStateMachineStorage storage =
+ (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+ Path parentPath = storage.findLatestSnapshot().getFile().getPath();
+ // Since the snapshot threshold is set to 1, since there are
+ // applyTransactions, we should see snapshots
+ Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
+ FileInfo snapshot = storage.findLatestSnapshot().getFile();
+ Assert.assertNotNull(snapshot);
+ long containerID = omKeyLocationInfo.getContainerID();
+ Pipeline pipeline = cluster.getStorageContainerLocationClient()
+ .getContainerWithPipeline(containerID).getPipeline();
+ XceiverClientSpi xceiverClient =
+ xceiverClientManager.acquireClient(pipeline);
+ ContainerProtos.ContainerCommandRequestProto.Builder request =
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
+ request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
+ request.setCmdType(ContainerProtos.Type.CloseContainer);
+ request.setContainerID(containerID);
+ request.setCloseContainer(
+ ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+ try {
+ xceiverClient.sendCommand(request.build());
+ } catch (IOException e) {
+ Assert.fail("Exception should not be thrown");
+ }
+ Assert.assertTrue(
+ cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+ .getContainer().getContainerSet().getContainer(containerID)
+ .getContainerState()
+ == ContainerProtos.ContainerDataProto.State.CLOSED);
+ Assert.assertTrue(stateMachine.isStateMachineHealthy());
+ try {
+ stateMachine.takeSnapshot();
+ } catch (IOException ioe) {
+ Assert.fail("Exception should not be thrown");
+ }
+ FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
+ Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath()));
+ }
+
+ @Test
public void testValidateBCSIDOnDnRestart() throws Exception {
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org