You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by lj...@apache.org on 2019/03/08 03:33:36 UTC

[hadoop] branch ozone-0.4 updated: HDDS-1208. ContainerStateMachine should set chunk data as state machine data for ratis. Contributed by Lokesh Jain.

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/ozone-0.4 by this push:
     new c334bc8  HDDS-1208. ContainerStateMachine should set chunk data as state machine data for ratis. Contributed by Lokesh Jain.
c334bc8 is described below

commit c334bc8baf982e179edbd2302fb41231b20fc590
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Wed Mar 6 17:00:37 2019 +0530

    HDDS-1208. ContainerStateMachine should set chunk data as state machine data for ratis. Contributed by Lokesh Jain.
    
    (cherry picked from commit 129fd5dd18dce0fba48561326a48082888bd6f83)
---
 .../server/ratis/ContainerStateMachine.java        | 73 +++++++---------------
 1 file changed, 23 insertions(+), 50 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index ed7e099..0fc2d0d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -78,7 +78,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -277,7 +276,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   public TransactionContext startTransaction(RaftClientRequest request)
       throws IOException {
     final ContainerCommandRequestProto proto =
-        getRequestProto(request.getMessage().getContent());
+        getContainerCommandRequestProto(request.getMessage().getContent());
     Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
     try (Scope scope = TracingUtil
         .importAndCreateScope(proto.getCmdType().name(), proto.getTraceID())) {
@@ -294,17 +293,6 @@ public class ContainerStateMachine extends BaseStateMachine {
       }
       if (proto.getCmdType() == Type.WriteChunk) {
         final WriteChunkRequestProto write = proto.getWriteChunk();
-        // create the state machine data proto
-        final WriteChunkRequestProto dataWriteChunkProto =
-            WriteChunkRequestProto
-                .newBuilder(write)
-                .build();
-        ContainerCommandRequestProto dataContainerCommandProto =
-            ContainerCommandRequestProto
-                .newBuilder(proto)
-                .setWriteChunk(dataWriteChunkProto)
-                .build();
-
         // create the log entry proto
         final WriteChunkRequestProto commitWriteChunkProto =
             WriteChunkRequestProto.newBuilder()
@@ -323,7 +311,7 @@ public class ContainerStateMachine extends BaseStateMachine {
             .setClientRequest(request)
             .setStateMachine(this)
             .setServerRole(RaftPeerRole.LEADER)
-            .setStateMachineData(dataContainerCommandProto.toByteString())
+            .setStateMachineData(write.getData())
             .setLogData(commitContainerCommandProto.toByteString())
             .build();
       } else {
@@ -341,8 +329,8 @@ public class ContainerStateMachine extends BaseStateMachine {
     return entryProto.getStateMachineEntry().getStateMachineData();
   }
 
-  private ContainerCommandRequestProto getRequestProto(ByteString request)
-      throws InvalidProtocolBufferException {
+  private ContainerCommandRequestProto getContainerCommandRequestProto(
+      ByteString request) throws InvalidProtocolBufferException {
     // TODO: We can avoid creating new builder and set pipeline Id if
     // the client is already sending the pipeline id, then we just have to
     // validate the pipeline Id.
@@ -353,7 +341,9 @@ public class ContainerStateMachine extends BaseStateMachine {
 
   private ContainerCommandResponseProto dispatchCommand(
       ContainerCommandRequestProto requestProto, DispatcherContext context) {
-    LOG.trace("dispatch {}", requestProto);
+    LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}",
+        requestProto.getCmdType(), requestProto.getContainerID(),
+        requestProto.getPipelineID(), requestProto.getTraceID());
     if (isBlockTokenEnabled) {
       try {
         // ServerInterceptors intercepts incoming request and creates ugi.
@@ -432,8 +422,15 @@ public class ContainerStateMachine extends BaseStateMachine {
   public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
     try {
       metrics.incNumWriteStateMachineOps();
-      final ContainerCommandRequestProto requestProto =
-          getRequestProto(getStateMachineData(entry.getStateMachineLogEntry()));
+      ContainerCommandRequestProto requestProto =
+          getContainerCommandRequestProto(
+              entry.getStateMachineLogEntry().getLogData());
+      WriteChunkRequestProto writeChunk =
+          WriteChunkRequestProto.newBuilder(requestProto.getWriteChunk())
+              .setData(getStateMachineData(entry.getStateMachineLogEntry()))
+              .build();
+      requestProto = ContainerCommandRequestProto.newBuilder(requestProto)
+          .setWriteChunk(writeChunk).build();
       Type cmdType = requestProto.getCmdType();
 
       // For only writeChunk, there will be writeStateMachineData call.
@@ -457,7 +454,7 @@ public class ContainerStateMachine extends BaseStateMachine {
     try {
       metrics.incNumReadStateMachineOps();
       final ContainerCommandRequestProto requestProto =
-          getRequestProto(request.getContent());
+          getContainerCommandRequestProto(request.getContent());
       return CompletableFuture.completedFuture(runCommand(requestProto, null));
     } catch (IOException e) {
       metrics.incNumReadStateMachineFails();
@@ -507,34 +504,8 @@ public class ContainerStateMachine extends BaseStateMachine {
    */
   private ByteString getCachedStateMachineData(Long logIndex, long term,
       ContainerCommandRequestProto requestProto) throws ExecutionException {
-    try {
-      return reconstructWriteChunkRequest(
-          stateMachineDataCache.get(logIndex, new Callable<ByteString>() {
-            @Override
-            public ByteString call() throws Exception {
-              return readStateMachineData(requestProto, term, logIndex);
-            }
-          }), requestProto);
-    } catch (ExecutionException e) {
-      throw e;
-    }
-  }
-
-  private ByteString reconstructWriteChunkRequest(ByteString data,
-      ContainerCommandRequestProto requestProto) {
-    WriteChunkRequestProto writeChunkRequestProto =
-        requestProto.getWriteChunk();
-    // reconstruct the write chunk request
-    final WriteChunkRequestProto.Builder dataWriteChunkProto =
-        WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
-            // adding the state machine data
-            .setData(data);
-
-    ContainerCommandRequestProto.Builder newStateMachineProto =
-        ContainerCommandRequestProto.newBuilder(requestProto)
-            .setWriteChunk(dataWriteChunkProto);
-
-    return newStateMachineProto.build().toByteString();
+    return stateMachineDataCache.get(logIndex,
+        () -> readStateMachineData(requestProto, term, logIndex));
   }
 
   /**
@@ -568,7 +539,8 @@ public class ContainerStateMachine extends BaseStateMachine {
     }
     try {
       final ContainerCommandRequestProto requestProto =
-          getRequestProto(entry.getStateMachineLogEntry().getLogData());
+          getContainerCommandRequestProto(
+              entry.getStateMachineLogEntry().getLogData());
       // readStateMachineData should only be called for "write" to Ratis.
       Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
       if (requestProto.getCmdType() == Type.WriteChunk) {
@@ -632,7 +604,8 @@ public class ContainerStateMachine extends BaseStateMachine {
     try {
       metrics.incNumApplyTransactionsOps();
       ContainerCommandRequestProto requestProto =
-          getRequestProto(trx.getStateMachineLogEntry().getLogData());
+          getContainerCommandRequestProto(
+              trx.getStateMachineLogEntry().getLogData());
       Type cmdType = requestProto.getCmdType();
       // Make sure that in write chunk, the user data is not set
       if (cmdType == Type.WriteChunk) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org