You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/07 08:59:52 UTC

[hadoop] branch ozone-0.4 updated: HDDS-1184. Parallelization of write chunks in datanodes is broken. Contributed by Shashikant Banerjee.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/ozone-0.4 by this push:
     new 8655abb  HDDS-1184. Parallelization of write chunks in datanodes is broken. Contributed by Shashikant Banerjee.
8655abb is described below

commit 8655abb353811eba349dd0703d702deb02532242
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Wed Mar 6 10:00:16 2019 +0530

    HDDS-1184. Parallelization of write chunks in datanodes is broken. Contributed by Shashikant Banerjee.
    
    (cherry picked from commit 62e89dc275f120f54967744393e2ddde15575096)
---
 .../src/main/proto/DatanodeContainerProtocol.proto |  1 +
 .../server/ratis/ContainerStateMachine.java        | 60 ++++++++++++----------
 2 files changed, 33 insertions(+), 28 deletions(-)

diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 3b78835..7396eb3 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -143,6 +143,7 @@ enum Result {
   BCSID_MISMATCH = 38;
   CONTAINER_NOT_OPEN = 39;
   CONTAINER_MISSING = 40;
+  BLOCK_TOKEN_VERIFICATION_FAILED = 41;
 }
 
 /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 759f957..ed7e099 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
 import io.opentracing.Scope;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -350,13 +352,20 @@ public class ContainerStateMachine extends BaseStateMachine {
   }
 
   private ContainerCommandResponseProto dispatchCommand(
-      ContainerCommandRequestProto requestProto,
-      DispatcherContext context) throws IOException {
+      ContainerCommandRequestProto requestProto, DispatcherContext context) {
     LOG.trace("dispatch {}", requestProto);
-    if(isBlockTokenEnabled) {
-      // ServerInterceptors intercepts incoming request and creates ugi.
-      tokenVerifier.verify(UserGroupInformation.getCurrentUser()
-          .getShortUserName(), requestProto.getEncodedToken());
+    if (isBlockTokenEnabled) {
+      try {
+        // ServerInterceptors intercepts incoming request and creates ugi.
+        tokenVerifier
+            .verify(UserGroupInformation.getCurrentUser().getShortUserName(),
+                requestProto.getEncodedToken());
+      } catch (IOException ioe) {
+        StorageContainerException sce = new StorageContainerException(
+            "Block token verification failed. " + ioe.getMessage(), ioe,
+            ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
+        return ContainerUtils.logAndReturnError(LOG, sce, requestProto);
+      }
     }
     ContainerCommandResponseProto response =
         dispatcher.dispatch(requestProto, context);
@@ -365,7 +374,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   }
 
   private Message runCommand(ContainerCommandRequestProto requestProto,
-      DispatcherContext context) throws IOException {
+      DispatcherContext context) {
     return dispatchCommand(requestProto, context)::toByteString;
   }
 
@@ -394,14 +403,10 @@ public class ContainerStateMachine extends BaseStateMachine {
             .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
             .setCreateContainerSet(createContainerSet)
             .build();
-    CompletableFuture<Message> writeChunkFuture;
-    try {
-      Message msg = runCommand(requestProto, context);
-      writeChunkFuture = CompletableFuture
-          .supplyAsync(() -> msg, chunkExecutor);
-    }catch(IOException ie) {
-      writeChunkFuture = completeExceptionally(ie);
-    }
+    // ensure the write chunk happens asynchronously in writeChunkExecutor pool
+    // thread.
+    CompletableFuture<Message> writeChunkFuture = CompletableFuture
+        .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
 
     writeChunkFutureMap.put(entryIndex, writeChunkFuture);
     LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
@@ -567,16 +572,18 @@ public class ContainerStateMachine extends BaseStateMachine {
       // readStateMachineData should only be called for "write" to Ratis.
       Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
       if (requestProto.getCmdType() == Type.WriteChunk) {
-        CompletableFuture<ByteString> future = new CompletableFuture<>();
-        return future.supplyAsync(() -> {
+        final CompletableFuture<ByteString> future = new CompletableFuture<>();
+        CompletableFuture.supplyAsync(() -> {
           try {
-            return getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
-                requestProto);
+            future.complete(
+                getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
+                    requestProto));
           } catch (ExecutionException e) {
             future.completeExceptionally(e);
-            return null;
           }
+          return future;
         }, chunkExecutor);
+        return future;
       } else {
         throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
             + " cannot have state machine data");
@@ -627,7 +634,6 @@ public class ContainerStateMachine extends BaseStateMachine {
       ContainerCommandRequestProto requestProto =
           getRequestProto(trx.getStateMachineLogEntry().getLogData());
       Type cmdType = requestProto.getCmdType();
-      CompletableFuture<Message> future;
       // Make sure that in write chunk, the user data is not set
       if (cmdType == Type.WriteChunk) {
         Preconditions
@@ -638,13 +644,11 @@ public class ContainerStateMachine extends BaseStateMachine {
       if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
         builder.setCreateContainerSet(createContainerSet);
       }
-      try {
-        Message msg = runCommand(requestProto, builder.build());
-        future = CompletableFuture.supplyAsync(() -> msg,
-            getCommandExecutor(requestProto));
-      } catch (IOException ie) {
-        future = completeExceptionally(ie);
-      }
+      // Ensure the command gets executed in a separate thread than
+      // stateMachineUpdater thread which is calling applyTransaction here.
+      CompletableFuture<Message> future = CompletableFuture
+          .supplyAsync(() -> runCommand(requestProto, builder.build()),
+              getCommandExecutor(requestProto));
 
       lastIndex = index;
       future.thenAccept(m -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org