You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by lj...@apache.org on 2019/09/17 11:19:34 UTC

[hadoop] branch trunk updated: HDDS-2117. ContainerStateMachine#writeStateMachineData times out. (#1430)

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7f90731  HDDS-2117. ContainerStateMachine#writeStateMachineData times out. (#1430)
7f90731 is described below

commit 7f9073132dcc9db157a6792635d2ed099f2ef0d2
Author: bshashikant <sh...@apache.org>
AuthorDate: Tue Sep 17 16:49:25 2019 +0530

    HDDS-2117. ContainerStateMachine#writeStateMachineData times out. (#1430)
---
 .../container/common/impl/HddsDispatcher.java      |  9 ++++---
 .../server/ratis/ContainerStateMachine.java        | 31 ++++++++++++++++------
 2 files changed, 29 insertions(+), 11 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index e95d899..37e19bc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -236,9 +236,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
         if (container2BCSIDMap != null) {
           // adds this container to list of containers created in the pipeline
           // with initial BCSID recorded as 0.
-          Preconditions
-              .checkArgument(!container2BCSIDMap.containsKey(containerID));
-          container2BCSIDMap.put(containerID, Long.valueOf(0));
+          container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
         }
         container = getContainer(containerID);
       }
@@ -290,6 +288,11 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
       // state here.
 
       Result result = responseProto.getResult();
+      if (cmdType == ContainerProtos.Type.CreateContainer
+          && result == Result.SUCCESS && dispatcherContext != null) {
+        Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
+        container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
+      }
       if (!HddsUtils.isReadOnly(msg) && !canIgnoreException(result)) {
         // If the container is open/closing and the container operation
         // has failed, it should be first marked unhealthy and the initiate the
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index cee9741..c6ab0a1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -435,13 +435,20 @@ public class ContainerStateMachine extends BaseStateMachine {
             .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
             .setContainer2BCSIDMap(container2BCSIDMap)
             .build();
+    CompletableFuture<Message> raftFuture = new CompletableFuture<>();
     // ensure the write chunk happens asynchronously in writeChunkExecutor pool
     // thread.
     CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
-        CompletableFuture.supplyAsync(() ->
-            runCommand(requestProto, context), chunkExecutor);
-
-    CompletableFuture<Message> raftFuture = new CompletableFuture<>();
+        CompletableFuture.supplyAsync(() -> {
+          try {
+            return runCommand(requestProto, context);
+          } catch (Exception e) {
+            LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId"
+                + write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+                + write.getChunkData().getChunkName() + e);
+            raftFuture.completeExceptionally(e);
+            throw e;
+          }}, chunkExecutor);
 
     writeChunkFutureMap.put(entryIndex, writeChunkFuture);
     LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
@@ -698,7 +705,7 @@ public class ContainerStateMachine extends BaseStateMachine {
             .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
       }
       if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile
-          || cmdType == Type.PutBlock) {
+          || cmdType == Type.PutBlock || cmdType == Type.CreateContainer) {
         builder.setContainer2BCSIDMap(container2BCSIDMap);
       }
       CompletableFuture<Message> applyTransactionFuture =
@@ -706,9 +713,17 @@ public class ContainerStateMachine extends BaseStateMachine {
       // Ensure the command gets executed in a separate thread than
       // stateMachineUpdater thread which is calling applyTransaction here.
       CompletableFuture<ContainerCommandResponseProto> future =
-          CompletableFuture.supplyAsync(
-              () -> runCommand(requestProto, builder.build()),
-              getCommandExecutor(requestProto));
+          CompletableFuture.supplyAsync(() -> {
+            try {
+              return runCommand(requestProto, builder.build());
+            } catch (Exception e) {
+              LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
+                      + "{} exception {}", gid, requestProto.getCmdType(),
+                  index, e);
+              applyTransactionFuture.completeExceptionally(e);
+              throw e;
+            }
+          }, getCommandExecutor(requestProto));
       future.thenApply(r -> {
         if (trx.getServerRole() == RaftPeerRole.LEADER) {
           long startTime = (long) trx.getStateMachineContext();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org