You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2018/08/07 09:34:16 UTC

hadoop git commit: HDDS-230. ContainerStateMachine should implement readStateMachineData api to read data from Containers if required during replication. Contributed by Mukul Kumar Singh.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2e4e02b4d -> 900c0e114


HDDS-230. ContainerStateMachine should implement readStateMachineData api to read data from Containers if required during replication. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/900c0e11
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/900c0e11
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/900c0e11

Branch: refs/heads/trunk
Commit: 900c0e114f391f4dbf21a0e08a63c2cf22659eb7
Parents: 2e4e02b
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Tue Aug 7 15:03:14 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Tue Aug 7 15:03:14 2018 +0530

----------------------------------------------------------------------
 .../server/ratis/ContainerStateMachine.java     | 142 ++++++++++++++++---
 .../server/ratis/XceiverServerRatis.java        |  10 +-
 .../org/apache/hadoop/ozone/om/OMMetrics.java   |   2 +-
 hadoop-project/pom.xml                          |   2 +-
 4 files changed, 129 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/900c0e11/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index c0dd0ba..15e991a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -19,20 +19,26 @@
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.shaded.com.google.protobuf
     .InvalidProtocolBufferException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .WriteChunkRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ReadChunkRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ReadChunkResponseProto;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
@@ -96,16 +102,16 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final SimpleStateMachineStorage storage
       = new SimpleStateMachineStorage();
   private final ContainerDispatcher dispatcher;
-  private ThreadPoolExecutor writeChunkExecutor;
+  private ThreadPoolExecutor chunkExecutor;
   private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       writeChunkFutureMap;
   private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       createContainerFutureMap;
 
   ContainerStateMachine(ContainerDispatcher dispatcher,
-      ThreadPoolExecutor writeChunkExecutor) {
+      ThreadPoolExecutor chunkExecutor) {
     this.dispatcher = dispatcher;
-    this.writeChunkExecutor = writeChunkExecutor;
+    this.chunkExecutor = chunkExecutor;
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
     this.createContainerFutureMap = new ConcurrentHashMap<>();
   }
@@ -117,9 +123,9 @@ public class ContainerStateMachine extends BaseStateMachine {
 
   @Override
   public void initialize(
-      RaftPeerId id, RaftProperties properties, RaftStorage raftStorage)
+      RaftServer server, RaftGroupId id, RaftStorage raftStorage)
       throws IOException {
-    super.initialize(id, properties, raftStorage);
+    super.initialize(server, id, raftStorage);
     storage.init(raftStorage);
     //  TODO handle snapshots
 
@@ -134,13 +140,13 @@ public class ContainerStateMachine extends BaseStateMachine {
         getRequestProto(request.getMessage().getContent());
 
     final SMLogEntryProto log;
-    if (proto.getCmdType() == ContainerProtos.Type.WriteChunk) {
+    if (proto.getCmdType() == Type.WriteChunk) {
       final WriteChunkRequestProto write = proto.getWriteChunk();
       // create the state machine data proto
       final WriteChunkRequestProto dataWriteChunkProto =
           WriteChunkRequestProto
               .newBuilder(write)
-              .setStage(ContainerProtos.Stage.WRITE_DATA)
+              .setStage(Stage.WRITE_DATA)
               .build();
       ContainerCommandRequestProto dataContainerCommandProto =
           ContainerCommandRequestProto
@@ -155,7 +161,7 @@ public class ContainerStateMachine extends BaseStateMachine {
               .setChunkData(write.getChunkData())
               // skipping the data field as it is
               // already set in statemachine data proto
-              .setStage(ContainerProtos.Stage.COMMIT_DATA)
+              .setStage(Stage.COMMIT_DATA)
               .build();
       ContainerCommandRequestProto commitContainerCommandProto =
           ContainerCommandRequestProto
@@ -167,7 +173,7 @@ public class ContainerStateMachine extends BaseStateMachine {
           .setData(commitContainerCommandProto.toByteString())
           .setStateMachineData(dataContainerCommandProto.toByteString())
           .build();
-    } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+    } else if (proto.getCmdType() == Type.CreateContainer) {
       log = SMLogEntryProto.newBuilder()
           .setData(request.getMessage().getContent())
           .setStateMachineData(request.getMessage().getContent())
@@ -185,11 +191,16 @@ public class ContainerStateMachine extends BaseStateMachine {
     return ContainerCommandRequestProto.parseFrom(request);
   }
 
-  private Message runCommand(ContainerCommandRequestProto requestProto) {
+  private ContainerCommandResponseProto dispatchCommand(
+      ContainerCommandRequestProto requestProto) {
     LOG.trace("dispatch {}", requestProto);
     ContainerCommandResponseProto response = dispatcher.dispatch(requestProto);
     LOG.trace("response {}", response);
-    return () -> response.toByteString();
+    return response;
+  }
+
+  private Message runCommand(ContainerCommandRequestProto requestProto) {
+    return dispatchCommand(requestProto)::toByteString;
   }
 
   private CompletableFuture<Message> handleWriteChunk(
@@ -201,10 +212,10 @@ public class ContainerStateMachine extends BaseStateMachine {
     CompletableFuture<Message> writeChunkFuture;
     if (future != null) {
       writeChunkFuture = future.thenApplyAsync(
-          v -> runCommand(requestProto), writeChunkExecutor);
+          v -> runCommand(requestProto), chunkExecutor);
     } else {
       writeChunkFuture = CompletableFuture.supplyAsync(
-          () -> runCommand(requestProto), writeChunkExecutor);
+          () -> runCommand(requestProto), chunkExecutor);
     }
     writeChunkFutureMap.put(entryIndex, writeChunkFuture);
     return writeChunkFuture;
@@ -227,7 +238,7 @@ public class ContainerStateMachine extends BaseStateMachine {
     try {
       final ContainerCommandRequestProto requestProto =
           getRequestProto(entry.getSmLogEntry().getStateMachineData());
-      ContainerProtos.Type cmdType = requestProto.getCmdType();
+      Type cmdType = requestProto.getCmdType();
       switch (cmdType) {
       case CreateContainer:
         return handleCreateContainer(requestProto);
@@ -253,6 +264,97 @@ public class ContainerStateMachine extends BaseStateMachine {
     }
   }
 
+  private LogEntryProto readStateMachineData(SMLogEntryProto smLogEntryProto,
+      ContainerCommandRequestProto requestProto) {
+    WriteChunkRequestProto writeChunkRequestProto =
+        requestProto.getWriteChunk();
+    // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is
+    // written through writeStateMachineData.
+    Preconditions.checkArgument(writeChunkRequestProto.getStage()
+        == Stage.COMMIT_DATA);
+
+    // prepare the chunk to be read
+    ReadChunkRequestProto.Builder readChunkRequestProto =
+        ReadChunkRequestProto.newBuilder()
+            .setBlockID(writeChunkRequestProto.getBlockID())
+            .setChunkData(writeChunkRequestProto.getChunkData());
+    ContainerCommandRequestProto dataContainerCommandProto =
+        ContainerCommandRequestProto.newBuilder(requestProto)
+            .setCmdType(Type.ReadChunk)
+            .setReadChunk(readChunkRequestProto)
+            .build();
+
+    // read the chunk
+    ContainerCommandResponseProto response =
+        dispatchCommand(dataContainerCommandProto);
+    ReadChunkResponseProto responseProto = response.getReadChunk();
+
+    // assert that the response has data in it.
+    Preconditions.checkNotNull(responseProto.getData());
+
+    // reconstruct the write chunk request
+    final WriteChunkRequestProto.Builder dataWriteChunkProto =
+        WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
+            // adding the state machine data
+            .setData(responseProto.getData())
+            .setStage(Stage.WRITE_DATA);
+
+    ContainerCommandRequestProto.Builder newStateMachineProto =
+        ContainerCommandRequestProto.newBuilder(requestProto)
+            .setWriteChunk(dataWriteChunkProto);
+
+    return recreateLogEntryProto(smLogEntryProto,
+        newStateMachineProto.build().toByteString());
+  }
+
+  private LogEntryProto recreateLogEntryProto(SMLogEntryProto smLogEntryProto,
+      ByteString stateMachineData) {
+    // recreate the log entry
+    final SMLogEntryProto log =
+        SMLogEntryProto.newBuilder(smLogEntryProto)
+            .setStateMachineData(stateMachineData)
+            .build();
+    return LogEntryProto.newBuilder().setSmLogEntry(log).build();
+  }
+
+  /*
+   * This api is used by the leader while appending logs to the follower
+   * This allows the leader to read the state machine data from the
+   * state machine implementation in case cached state machine data has been
+   * evicted.
+   */
+  @Override
+  public CompletableFuture<LogEntryProto> readStateMachineData(
+      LogEntryProto entry) {
+    SMLogEntryProto smLogEntryProto = entry.getSmLogEntry();
+    if (!smLogEntryProto.getStateMachineData().isEmpty()) {
+      return CompletableFuture.completedFuture(entry);
+    }
+
+    try {
+      final ContainerCommandRequestProto requestProto =
+          getRequestProto(entry.getSmLogEntry().getData());
+      // readStateMachineData should only be called for "write" to Ratis.
+      Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
+
+      if (requestProto.getCmdType() == Type.WriteChunk) {
+        return CompletableFuture.supplyAsync(() ->
+                readStateMachineData(smLogEntryProto, requestProto),
+            chunkExecutor);
+      } else if (requestProto.getCmdType() == Type.CreateContainer) {
+        LogEntryProto log =
+            recreateLogEntryProto(smLogEntryProto, requestProto.toByteString());
+        return CompletableFuture.completedFuture(log);
+      } else {
+        throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
+            + " cannot have state machine data");
+      }
+    } catch (Exception e) {
+      LOG.error("unable to read stateMachineData:" + e);
+      return completeExceptionally(e);
+    }
+  }
+
   /*
    * ApplyTransaction calls in Ratis are sequential.
    */
@@ -261,9 +363,9 @@ public class ContainerStateMachine extends BaseStateMachine {
     try {
       ContainerCommandRequestProto requestProto =
           getRequestProto(trx.getSMLogEntry().getData());
-      ContainerProtos.Type cmdType = requestProto.getCmdType();
+      Type cmdType = requestProto.getCmdType();
 
-      if (cmdType == ContainerProtos.Type.WriteChunk) {
+      if (cmdType == Type.WriteChunk) {
         WriteChunkRequestProto write = requestProto.getWriteChunk();
         // the data field has already been removed in start Transaction
         Preconditions.checkArgument(!write.hasData());
@@ -274,7 +376,7 @@ public class ContainerStateMachine extends BaseStateMachine {
                 CompletableFuture.completedFuture(runCommand(requestProto)));
       } else {
         Message message = runCommand(requestProto);
-        if (cmdType == ContainerProtos.Type.CreateContainer) {
+        if (cmdType == Type.CreateContainer) {
           long containerID = requestProto.getContainerID();
           createContainerFutureMap.remove(containerID).complete(message);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/900c0e11/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index b9c7cae..723b94ae 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -72,7 +72,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
 
   private final int port;
   private final RaftServer server;
-  private ThreadPoolExecutor writeChunkExecutor;
+  private ThreadPoolExecutor chunkExecutor;
 
   private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
       ContainerDispatcher dispatcher, Configuration conf) throws IOException {
@@ -117,13 +117,13 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     setRequestTimeout(serverProperties, clientRequestTimeout,
         serverRequestTimeout);
 
-    writeChunkExecutor =
+    chunkExecutor =
         new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
             100, TimeUnit.SECONDS,
             new ArrayBlockingQueue<>(1024),
             new ThreadPoolExecutor.CallerRunsPolicy());
     ContainerStateMachine stateMachine =
-        new ContainerStateMachine(dispatcher, writeChunkExecutor);
+        new ContainerStateMachine(dispatcher, chunkExecutor);
     this.server = RaftServer.newBuilder()
         .setServerId(RatisHelper.toRaftPeerId(dd))
         .setGroup(RatisHelper.emptyRaftGroup())
@@ -225,14 +225,14 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   public void start() throws IOException {
     LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
         server.getId(), getIPCPort());
-    writeChunkExecutor.prestartAllCoreThreads();
+    chunkExecutor.prestartAllCoreThreads();
     server.start();
   }
 
   @Override
   public void stop() {
     try {
-      writeChunkExecutor.shutdown();
+      chunkExecutor.shutdown();
       server.close();
     } catch (IOException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/900c0e11/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 2d04452..b8cfc97 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -88,7 +88,7 @@ public class OMMetrics {
   public static OMMetrics create() {
     MetricsSystem ms = DefaultMetricsSystem.instance();
     return ms.register(SOURCE_NAME,
-        "Oozne Manager Metrics",
+        "Ozone Manager Metrics",
         new OMMetrics());
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/900c0e11/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 387a3da..7603842 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -97,7 +97,7 @@
     <ldap-api.version>1.0.0-M33</ldap-api.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.1.1-alpha-d7d7061-SNAPSHOT</ratis.version>
+    <ratis.version>0.3.0-c242317-SNAPSHOT</ratis.version>
     <jcache.version>1.0-alpha-1</jcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <hikari.version>2.4.12</hikari.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org