You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2018/01/23 06:03:38 UTC

hadoop git commit: HDFS-13024. Ozone: ContainerStateMachine should synchronize operations between createContainer and writeChunk. Contributed by Mukul Kumar Singh.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 b1c8c1de7 -> a6c2b6694


HDFS-13024. Ozone: ContainerStateMachine should synchronize operations between createContainer and writeChunk. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a6c2b669
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a6c2b669
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a6c2b669

Branch: refs/heads/HDFS-7240
Commit: a6c2b6694b6607bdff8465bf6641716711e166c6
Parents: b1c8c1d
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Tue Jan 23 11:19:46 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Tue Jan 23 11:19:46 2018 +0530

----------------------------------------------------------------------
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |   4 +
 .../org/apache/hadoop/scm/ScmConfigKeys.java    |   6 +-
 .../server/ratis/ContainerStateMachine.java     | 115 +++++++++++++++----
 .../server/ratis/XceiverServerRatis.java        |  36 ++++--
 .../src/main/resources/ozone-default.xml        |  12 +-
 5 files changed, 135 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6c2b669/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 8059b5e..22a4787 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -229,6 +229,10 @@ public final class OzoneConfigKeys {
       = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY;
   public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT
       = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT;
+  public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY;
+  public static final int DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT;
   public static final int DFS_CONTAINER_CHUNK_MAX_SIZE
       = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
   public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6c2b669/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index b41db77..6f5c873 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -57,7 +57,11 @@ public final class ScmConfigKeys {
   public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
       "dfs.container.ratis.segment.size";
   public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
-      128 * 1024 * 1024;
+      1 * 1024 * 1024 * 1024;
+  public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY =
+      "dfs.container.ratis.segment.preallocated.size";
+  public static final int
+      DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = 128 * 1024 * 1024;
 
   // TODO : this is copied from OzoneConsts, may need to move to a better place
   public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6c2b669/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index a4517b3..c96cc5d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -45,29 +46,61 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ArrayBlockingQueue;
 
-/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.  */
+/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
+ *
+ * The stateMachine is responsible for handling different types of container
+ * requests. The container requests can be divided into readonly and write
+ * requests.
+ *
+ * Read only requests are classified in
+ * {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly}
+ * and these readonly requests are replied from the
+ * {@link #query(RaftClientRequest)}
+ *
+ * The write requests can be divided into requests with user data
+ * (WriteChunkRequest) and other request without user data.
+ *
+ * Inorder to optimize the write throughput, the writeChunk request is
+ * processed in 2 phases. The 2 phases are divided in
+ * {@link #startTransaction(RaftClientRequest)}, in the first phase the user
+ * data is written directly into the state machine via
+ * {@link #writeStateMachineData} and in the second phase the
+ * transaction is committed via {@link #applyTransaction(TransactionContext)}
+ *
+ * For the requests with no stateMachine data, the transaction is directly
+ * committed through
+ * {@link #applyTransaction(TransactionContext)}
+ *
+ * There are 2 ordering operation which are enforced right now in the code,
+ * 1) Write chunk operation are executed after the create container operation,
+ * the write chunk operation will fail otherwise as the container still hasn't
+ * been created. Hence the create container operation has been split in the
+ * {@link #startTransaction(RaftClientRequest)}, this will help in synchronizing
+ * the calls in {@link #writeStateMachineData}
+ *
+ * 2) Write chunk commit operation is executed after write chunk state machine
+ * operation. This will ensure that commit operation is sync'd with the state
+ * machine operation.
+ * */
 public class ContainerStateMachine extends BaseStateMachine {
   static final Logger LOG = LoggerFactory.getLogger(
       ContainerStateMachine.class);
   private final SimpleStateMachineStorage storage
       = new SimpleStateMachineStorage();
   private final ContainerDispatcher dispatcher;
-  private final ThreadPoolExecutor writeChunkExecutor;
+  private ThreadPoolExecutor writeChunkExecutor;
   private final ConcurrentHashMap<String, CompletableFuture<Message>>
-                                                                writeChunkMap;
+      writeChunkFutureMap;
+  private final ConcurrentHashMap<String, CompletableFuture<Message>>
+      createContainerFutureMap;
 
   ContainerStateMachine(ContainerDispatcher dispatcher,
-      int numWriteChunkThreads) {
+      ThreadPoolExecutor writeChunkExecutor) {
     this.dispatcher = dispatcher;
-    writeChunkMap = new ConcurrentHashMap<>();
-    writeChunkExecutor =
-        new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
-            60, TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(1024),
-            new ThreadPoolExecutor.CallerRunsPolicy());
+    this.writeChunkExecutor = writeChunkExecutor;
+    this.writeChunkFutureMap = new ConcurrentHashMap<>();
+    this.createContainerFutureMap = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -81,13 +114,13 @@ public class ContainerStateMachine extends BaseStateMachine {
       throws IOException {
     super.initialize(id, properties, raftStorage);
     storage.init(raftStorage);
-    writeChunkExecutor.prestartAllCoreThreads();
     //  TODO handle snapshots
 
     // TODO: Add a flag that tells you that initialize has been called.
     // Check with Ratis if this feature is done in Ratis.
   }
 
+  @Override
   public TransactionContext startTransaction(RaftClientRequest request)
       throws IOException {
     final ContainerCommandRequestProto proto =
@@ -110,8 +143,12 @@ public class ContainerStateMachine extends BaseStateMachine {
 
       // create the log entry proto
       final WriteChunkRequestProto commitWriteChunkProto =
-          WriteChunkRequestProto
-              .newBuilder(write)
+          WriteChunkRequestProto.newBuilder()
+              .setPipeline(write.getPipeline())
+              .setKeyName(write.getKeyName())
+              .setChunkData(write.getChunkData())
+              // skipping the data field as it is
+              // already set in statemachine data proto
               .setStage(ContainerProtos.Stage.COMMIT_DATA)
               .build();
       ContainerCommandRequestProto commitContainerCommandProto =
@@ -124,6 +161,11 @@ public class ContainerStateMachine extends BaseStateMachine {
           .setData(getShadedByteString(commitContainerCommandProto))
           .setStateMachineData(getShadedByteString(dataContainerCommandProto))
           .build();
+    } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+      log = SMLogEntryProto.newBuilder()
+          .setData(request.getMessage().getContent())
+          .setStateMachineData(request.getMessage().getContent())
+          .build();
     } else {
       log = SMLogEntryProto.newBuilder()
           .setData(request.getMessage().getContent())
@@ -154,12 +196,30 @@ public class ContainerStateMachine extends BaseStateMachine {
     try {
       final ContainerCommandRequestProto requestProto =
           getRequestProto(entry.getSmLogEntry().getStateMachineData());
-      final WriteChunkRequestProto write = requestProto.getWriteChunk();
-      Message raftClientReply = runCommand(requestProto);
-      CompletableFuture<Message> future =
-          CompletableFuture.completedFuture(raftClientReply);
-      writeChunkMap.put(write.getChunkData().getChunkName(),future);
-      return future;
+      if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+        String containerName =
+            requestProto.getCreateContainer().getContainerData().getName();
+        createContainerFutureMap.
+            computeIfAbsent(containerName, k -> new CompletableFuture<>());
+        return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+      } else {
+        final WriteChunkRequestProto write = requestProto.getWriteChunk();
+        String containerName = write.getPipeline().getContainerName();
+        CompletableFuture<Message> future =
+            createContainerFutureMap.get(containerName);
+
+        CompletableFuture<Message> writeChunkFuture;
+        if (future != null) {
+          writeChunkFuture = future.thenApplyAsync(
+              v -> runCommand(requestProto), writeChunkExecutor);
+        } else {
+          writeChunkFuture = CompletableFuture.supplyAsync(
+              () -> runCommand(requestProto), writeChunkExecutor);
+        }
+        writeChunkFutureMap
+            .put(write.getChunkData().getChunkName(), writeChunkFuture);
+        return writeChunkFuture;
+      }
     } catch (IOException e) {
       return completeExceptionally(e);
     }
@@ -186,13 +246,21 @@ public class ContainerStateMachine extends BaseStateMachine {
 
       if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) {
         WriteChunkRequestProto write = requestProto.getWriteChunk();
+        // the data field has already been removed in start Transaction
+        Preconditions.checkArgument(!write.hasData());
         CompletableFuture<Message> stateMachineFuture =
-            writeChunkMap.remove(write.getChunkData().getChunkName());
+            writeChunkFutureMap.remove(write.getChunkData().getChunkName());
         return stateMachineFuture
             .thenComposeAsync(v ->
                 CompletableFuture.completedFuture(runCommand(requestProto)));
       } else {
-        return CompletableFuture.completedFuture(runCommand(requestProto));
+        Message message = runCommand(requestProto);
+        if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+          String containerName =
+              requestProto.getCreateContainer().getContainerData().getName();
+          createContainerFutureMap.remove(containerName).complete(message);
+        }
+        return CompletableFuture.completedFuture(message);
       }
     } catch (IOException e) {
       return completeExceptionally(e);
@@ -207,6 +275,5 @@ public class ContainerStateMachine extends BaseStateMachine {
 
   @Override
   public void close() throws IOException {
-    writeChunkExecutor.shutdown();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6c2b669/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 7baca25..ff52341 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.container.common.transport.server
     .XceiverServerSpi;
 
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
@@ -48,6 +47,9 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
 import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Creates a ratis server endpoint that acts as the communication layer for
@@ -57,6 +59,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
   private final int port;
   private final RaftServer server;
+  private ThreadPoolExecutor writeChunkExecutor;
 
   private XceiverServerRatis(DatanodeID id, int port, String storageDir,
       ContainerDispatcher dispatcher, Configuration conf) throws IOException {
@@ -68,6 +71,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     final int raftSegmentSize = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
+    final int raftSegmentPreallocatedSize = conf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
     final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
     final int numWriteChunkThreads = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
@@ -76,28 +82,34 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     Objects.requireNonNull(id, "id == null");
     this.port = port;
     RaftProperties serverProperties = newRaftProperties(rpc, port,
-        storageDir, maxChunkSize, raftSegmentSize);
-
+        storageDir, maxChunkSize, raftSegmentSize, raftSegmentPreallocatedSize);
+
+    writeChunkExecutor =
+        new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
+            100, TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(1024),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    ContainerStateMachine stateMachine =
+        new ContainerStateMachine(dispatcher, writeChunkExecutor);
     this.server = RaftServer.newBuilder()
         .setServerId(RatisHelper.toRaftPeerId(id))
         .setGroup(RatisHelper.emptyRaftGroup())
         .setProperties(serverProperties)
-        .setStateMachine(new ContainerStateMachine(dispatcher,
-            numWriteChunkThreads))
+        .setStateMachine(stateMachine)
         .build();
   }
 
   private static RaftProperties newRaftProperties(
       RpcType rpc, int port, String storageDir, int scmChunkSize,
-      int raftSegmentSize) {
+      int raftSegmentSize, int raftSegmentPreallocatedSize) {
     final RaftProperties properties = new RaftProperties();
     RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
     RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
-        SizeInBytes.valueOf(raftSegmentSize));
+        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
     RaftServerConfigKeys.Log.setWriteBufferSize(properties,
         SizeInBytes.valueOf(scmChunkSize));
     RaftServerConfigKeys.Log.setPreallocatedSize(properties,
-        SizeInBytes.valueOf(raftSegmentSize));
+        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
     RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
         SizeInBytes.valueOf(raftSegmentSize));
     RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
@@ -106,9 +118,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     //TODO: change these configs to setter after RATIS-154
     properties.setInt("raft.server.log.segment.cache.num.max", 2);
     properties.setInt("raft.grpc.message.size.max",
-        scmChunkSize + raftSegmentSize);
-    properties.setInt("raft.server.rpc.timeout.min", 500);
-    properties.setInt("raft.server.rpc.timeout.max", 600);
+        scmChunkSize + raftSegmentPreallocatedSize);
+    properties.setInt("raft.server.rpc.timeout.min", 800);
+    properties.setInt("raft.server.rpc.timeout.max", 1000);
     if (rpc == SupportedRpcType.GRPC) {
       GrpcConfigKeys.Server.setPort(properties, port);
     } else {
@@ -171,12 +183,14 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   public void start() throws IOException {
     LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
         server.getId(), getIPCPort());
+    writeChunkExecutor.prestartAllCoreThreads();
     server.start();
   }
 
   @Override
   public void stop() {
     try {
+      writeChunkExecutor.shutdown();
       server.close();
     } catch (IOException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6c2b669/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index e1da595..434f5c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -356,10 +356,18 @@
   </property>
   <property>
     <name>dfs.container.ratis.segment.size</name>
-    <value>134217728</value>
+    <value>1073741824</value>
     <tag>OZONE, RATIS, PERFORMANCE</tag>
     <description>The size of the raft segment used by Apache Ratis on datanodes.
-      (128 MB by default)
+      (1 GB by default)
+    </description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.segment.preallocated.size</name>
+    <value>134217728</value>
+    <tag>OZONE, RATIS, PERFORMANCE</tag>
+    <description>The size of the buffer which is preallocated for raft segment
+      used by Apache Ratis on datanodes.(128 MB by default)
     </description>
   </property>
   <property>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org