You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/10/02 04:41:19 UTC

[hadoop] branch trunk updated: HDDS-2210. ContainerStateMachine should not be marked unhealthy if applyTransaction fails with closed container exception(#1552).

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 41440ec  HDDS-2210. ContainerStateMachine should not be marked unhealthy if applyTransaction fails with closed container exception(#1552).
41440ec is described below

commit 41440ec890348f95bf7f10b5ced737e41dd6c3d3
Author: bshashikant <sh...@apache.org>
AuthorDate: Wed Oct 2 10:11:01 2019 +0530

    HDDS-2210. ContainerStateMachine should not be marked unhealthy if applyTransaction fails with closed container exception(#1552).
---
 .../server/ratis/ContainerStateMachine.java        | 20 +++++--
 .../rpc/TestContainerStateMachineFailures.java     | 65 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 0535763..7b638a3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -150,7 +150,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final Cache<Long, ByteString> stateMachineDataCache;
   private final boolean isBlockTokenEnabled;
   private final TokenVerifier tokenVerifier;
-  private final AtomicBoolean isStateMachineHealthy;
+  private final AtomicBoolean stateMachineHealthy;
 
   private final Semaphore applyTransactionSemaphore;
   /**
@@ -190,7 +190,7 @@ public class ContainerStateMachine extends BaseStateMachine {
         ScmConfigKeys.
             DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
     applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
-    isStateMachineHealthy = new AtomicBoolean(true);
+    stateMachineHealthy = new AtomicBoolean(true);
     this.executors = new ExecutorService[numContainerOpExecutors];
     for (int i = 0; i < numContainerOpExecutors; i++) {
       final int index = i;
@@ -271,11 +271,15 @@ public class ContainerStateMachine extends BaseStateMachine {
     IOUtils.write(builder.build().toByteArray(), out);
   }
 
+  public boolean isStateMachineHealthy() {
+    return stateMachineHealthy.get();
+  }
+
   @Override
   public long takeSnapshot() throws IOException {
     TermIndex ti = getLastAppliedTermIndex();
     long startTime = Time.monotonicNow();
-    if (!isStateMachineHealthy.get()) {
+    if (!isStateMachineHealthy()) {
       String msg =
           "Failed to take snapshot " + " for " + gid + " as the stateMachine"
               + " is unhealthy. The last applied index is at " + ti;
@@ -731,7 +735,11 @@ public class ContainerStateMachine extends BaseStateMachine {
           metrics.incPipelineLatency(cmdType,
               Time.monotonicNowNanos() - startTime);
         }
-        if (r.getResult() != ContainerProtos.Result.SUCCESS) {
+        // ignore close container exception while marking the stateMachine
+        // unhealthy
+        if (r.getResult() != ContainerProtos.Result.SUCCESS
+            && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+            && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
           StorageContainerException sce =
               new StorageContainerException(r.getMessage(), r.getResult());
           LOG.error(
@@ -744,7 +752,7 @@ public class ContainerStateMachine extends BaseStateMachine {
           // caught in stateMachineUpdater in Ratis and ratis server will
           // shutdown.
           applyTransactionFuture.completeExceptionally(sce);
-          isStateMachineHealthy.compareAndSet(true, false);
+          stateMachineHealthy.compareAndSet(true, false);
           ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
         } else {
           LOG.debug(
@@ -759,7 +767,7 @@ public class ContainerStateMachine extends BaseStateMachine {
           // add the entry to the applyTransactionCompletionMap only if the
           // stateMachine is healthy i.e, there has been no applyTransaction
           // failures before.
-          if (isStateMachineHealthy.get()) {
+          if (isStateMachineHealthy()) {
             final Long previous = applyTransactionCompletionMap
                 .put(index, trx.getLogEntry().getTerm());
             Preconditions.checkState(previous == null);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 7b90815..9ac45b8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -354,6 +354,71 @@ public class TestContainerStateMachineFailures {
   }
 
   @Test
+  public void testApplyTransactionIdempotencyWithClosedContainer()
+      throws Exception {
+    OzoneOutputStream key =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey("ratis", 1024, ReplicationType.RATIS,
+                ReplicationFactor.ONE, new HashMap<>());
+    // First write and flush creates a container in the datanode
+    key.write("ratis".getBytes());
+    key.flush();
+    key.write("ratis".getBytes());
+    KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    ContainerData containerData =
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer().getContainerSet()
+            .getContainer(omKeyLocationInfo.getContainerID())
+            .getContainerData();
+    Assert.assertTrue(containerData instanceof KeyValueContainerData);
+    key.close();
+    ContainerStateMachine stateMachine =
+        (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
+    SimpleStateMachineStorage storage =
+        (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+    Path parentPath = storage.findLatestSnapshot().getFile().getPath();
+    // Since the snapshot threshold is set to 1, since there are
+    // applyTransactions, we should see snapshots
+    Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
+    FileInfo snapshot = storage.findLatestSnapshot().getFile();
+    Assert.assertNotNull(snapshot);
+    long containerID = omKeyLocationInfo.getContainerID();
+    Pipeline pipeline = cluster.getStorageContainerLocationClient()
+        .getContainerWithPipeline(containerID).getPipeline();
+    XceiverClientSpi xceiverClient =
+        xceiverClientManager.acquireClient(pipeline);
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
+    request.setCmdType(ContainerProtos.Type.CloseContainer);
+    request.setContainerID(containerID);
+    request.setCloseContainer(
+        ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+    try {
+      xceiverClient.sendCommand(request.build());
+    } catch (IOException e) {
+      Assert.fail("Exception should not be thrown");
+    }
+    Assert.assertTrue(
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer().getContainerSet().getContainer(containerID)
+            .getContainerState()
+            == ContainerProtos.ContainerDataProto.State.CLOSED);
+    Assert.assertTrue(stateMachine.isStateMachineHealthy());
+    try {
+      stateMachine.takeSnapshot();
+    } catch (IOException ioe) {
+      Assert.fail("Exception should not be thrown");
+    }
+    FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
+    Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath()));
+  }
+
+  @Test
   public void testValidateBCSIDOnDnRestart() throws Exception {
     OzoneOutputStream key =
         objectStore.getVolume(volumeName).getBucket(bucketName)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org