You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/02/20 18:54:25 UTC

[2/2] hadoop git commit: HDFS-13078. Ozone: Update Ratis on Ozone to 0.1.1-alpha-8fd74ed-SNAPSHOT. To fix large chunk reads (>4M) from Datanodes. Contributed by Mukul Kumar Singh.

HDFS-13078. Ozone: Update Ratis on Ozone to 0.1.1-alpha-8fd74ed-SNAPSHOT.
To fix large chunk reads (>4M) from Datanodes. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c8a8ee50
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c8a8ee50
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c8a8ee50

Branch: refs/heads/HDFS-7240
Commit: c8a8ee5000692feb2354edb42d86578155b9c6d2
Parents: 9af91f5
Author: Anu Engineer <ae...@apache.org>
Authored: Tue Feb 20 10:53:33 2018 -0800
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue Feb 20 10:53:33 2018 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/ratis/RatisHelper.java |   5 +
 .../server/ratis/ContainerStateMachine.java     |  87 ++++++-----
 .../hadoop/ozone/tools/TestDataValidate.java    | 146 +++++++++++++++++++
 .../apache/hadoop/ozone/tools/TestFreon.java    |  29 ----
 hadoop-project/pom.xml                          |   2 +-
 5 files changed, 201 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8a8ee50/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
index 3fabe48..de159ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
@@ -19,15 +19,18 @@
 package org.apache.ratis;
 
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,6 +116,8 @@ public interface RatisHelper {
     LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
     final RaftProperties properties = new RaftProperties();
     RaftConfigKeys.Rpc.setType(properties, rpcType);
+    GrpcConfigKeys.setMessageSizeMax(properties,
+        SizeInBytes.valueOf(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE));
 
     return RaftClient.newBuilder()
         .setRaftGroup(group)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8a8ee50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index c96cc5d..569fb23 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -26,17 +26,17 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkReq
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.ratis.statemachine.BaseStateMachine;
-import org.apache.ratis.statemachine.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.TransactionContextImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
@@ -55,8 +55,7 @@ import java.util.concurrent.ConcurrentHashMap;
  *
  * Read only requests are classified in
  * {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly}
- * and these readonly requests are replied from the
- * {@link #query(RaftClientRequest)}
+ * and these readonly requests are replied from the {@link #query(Message)}.
  *
  * The write requests can be divided into requests with user data
  * (WriteChunkRequest) and other request without user data.
@@ -90,7 +89,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       = new SimpleStateMachineStorage();
   private final ContainerDispatcher dispatcher;
   private ThreadPoolExecutor writeChunkExecutor;
-  private final ConcurrentHashMap<String, CompletableFuture<Message>>
+  private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       writeChunkFutureMap;
   private final ConcurrentHashMap<String, CompletableFuture<Message>>
       createContainerFutureMap;
@@ -171,7 +170,7 @@ public class ContainerStateMachine extends BaseStateMachine {
           .setData(request.getMessage().getContent())
           .build();
     }
-    return new TransactionContext(this, request, log);
+    return new TransactionContextImpl(this, request, log);
   }
 
   private ByteString getShadedByteString(ContainerCommandRequestProto proto) {
@@ -191,34 +190,47 @@ public class ContainerStateMachine extends BaseStateMachine {
     return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray());
   }
 
+  private CompletableFuture<Message> handleWriteChunk(
+      ContainerCommandRequestProto requestProto, long entryIndex) {
+    final WriteChunkRequestProto write = requestProto.getWriteChunk();
+    String containerName = write.getPipeline().getContainerName();
+    CompletableFuture<Message> future =
+        createContainerFutureMap.get(containerName);
+    CompletableFuture<Message> writeChunkFuture;
+    if (future != null) {
+      writeChunkFuture = future.thenApplyAsync(
+          v -> runCommand(requestProto), writeChunkExecutor);
+    } else {
+      writeChunkFuture = CompletableFuture.supplyAsync(
+          () -> runCommand(requestProto), writeChunkExecutor);
+    }
+    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+    return writeChunkFuture;
+  }
+
+  private CompletableFuture<Message> handleCreateContainer(
+      ContainerCommandRequestProto requestProto) {
+    String containerName =
+        requestProto.getCreateContainer().getContainerData().getName();
+    createContainerFutureMap.
+        computeIfAbsent(containerName, k -> new CompletableFuture<>());
+    return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+  }
+
   @Override
   public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
     try {
       final ContainerCommandRequestProto requestProto =
           getRequestProto(entry.getSmLogEntry().getStateMachineData());
-      if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
-        String containerName =
-            requestProto.getCreateContainer().getContainerData().getName();
-        createContainerFutureMap.
-            computeIfAbsent(containerName, k -> new CompletableFuture<>());
-        return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
-      } else {
-        final WriteChunkRequestProto write = requestProto.getWriteChunk();
-        String containerName = write.getPipeline().getContainerName();
-        CompletableFuture<Message> future =
-            createContainerFutureMap.get(containerName);
-
-        CompletableFuture<Message> writeChunkFuture;
-        if (future != null) {
-          writeChunkFuture = future.thenApplyAsync(
-              v -> runCommand(requestProto), writeChunkExecutor);
-        } else {
-          writeChunkFuture = CompletableFuture.supplyAsync(
-              () -> runCommand(requestProto), writeChunkExecutor);
-        }
-        writeChunkFutureMap
-            .put(write.getChunkData().getChunkName(), writeChunkFuture);
-        return writeChunkFuture;
+      ContainerProtos.Type cmdType = requestProto.getCmdType();
+      switch (cmdType) {
+      case CreateContainer:
+        return handleCreateContainer(requestProto);
+      case WriteChunk:
+        return handleWriteChunk(requestProto, entry.getIndex());
+      default:
+        throw new IllegalStateException("Cmd Type:" + cmdType
+            + " should not have state machine data");
       }
     } catch (IOException e) {
       return completeExceptionally(e);
@@ -226,13 +238,11 @@ public class ContainerStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public CompletableFuture<RaftClientReply> query(RaftClientRequest request) {
+  public CompletableFuture<Message> query(Message request) {
     try {
       final ContainerCommandRequestProto requestProto =
-          getRequestProto(request.getMessage().getContent());
-      RaftClientReply raftClientReply =
-          new RaftClientReply(request, runCommand(requestProto));
-      return CompletableFuture.completedFuture(raftClientReply);
+          getRequestProto(request.getContent());
+      return CompletableFuture.completedFuture(runCommand(requestProto));
     } catch (IOException e) {
       return completeExceptionally(e);
     }
@@ -243,19 +253,20 @@ public class ContainerStateMachine extends BaseStateMachine {
     try {
       ContainerCommandRequestProto requestProto =
           getRequestProto(trx.getSMLogEntry().getData());
+      ContainerProtos.Type cmdType = requestProto.getCmdType();
 
-      if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) {
+      if (cmdType == ContainerProtos.Type.WriteChunk) {
         WriteChunkRequestProto write = requestProto.getWriteChunk();
         // the data field has already been removed in start Transaction
         Preconditions.checkArgument(!write.hasData());
         CompletableFuture<Message> stateMachineFuture =
-            writeChunkFutureMap.remove(write.getChunkData().getChunkName());
+            writeChunkFutureMap.remove(trx.getLogEntry().getIndex());
         return stateMachineFuture
             .thenComposeAsync(v ->
                 CompletableFuture.completedFuture(runCommand(requestProto)));
       } else {
         Message message = runCommand(requestProto);
-        if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+        if (cmdType == ContainerProtos.Type.CreateContainer) {
           String containerName =
               requestProto.getCreateContainer().getContainerData().getName();
           createContainerFutureMap.remove(containerName).complete(message);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8a8ee50/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestDataValidate.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestDataValidate.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestDataValidate.java
new file mode 100644
index 0000000..0ba1cde
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestDataValidate.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.tools;
+
+import org.apache.hadoop.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests Freon, with MiniOzoneCluster and validate data.
+ */
+public class TestDataValidate {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = new MiniOzoneClassicCluster.Builder(conf)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
+        .numDataNodes(5).build();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void ratisTestLargeKey() throws Exception {
+    List<String> args = new ArrayList<>();
+    args.add("-validateWrites");
+    args.add("-numOfVolumes");
+    args.add("1");
+    args.add("-numOfBuckets");
+    args.add("1");
+    args.add("-numOfKeys");
+    args.add("1");
+    args.add("-ratis");
+    args.add("3");
+    args.add("-keySize");
+    args.add("104857600");
+    Freon freon = new Freon(conf);
+    int res = ToolRunner.run(conf, freon,
+        args.toArray(new String[0]));
+    Assert.assertEquals(1, freon.getNumberOfVolumesCreated());
+    Assert.assertEquals(1, freon.getNumberOfBucketsCreated());
+    Assert.assertEquals(1, freon.getNumberOfKeysAdded());
+    Assert.assertEquals(0, freon.getUnsuccessfulValidationCount());
+    Assert.assertEquals(0, res);
+  }
+
+  @Test
+  public void standaloneTestLargeKey() throws Exception {
+    List<String> args = new ArrayList<>();
+    args.add("-validateWrites");
+    args.add("-numOfVolumes");
+    args.add("1");
+    args.add("-numOfBuckets");
+    args.add("1");
+    args.add("-numOfKeys");
+    args.add("1");
+    args.add("-keySize");
+    args.add("104857600");
+    Freon freon = new Freon(conf);
+    int res = ToolRunner.run(conf, freon,
+        args.toArray(new String[0]));
+    Assert.assertEquals(1, freon.getNumberOfVolumesCreated());
+    Assert.assertEquals(1, freon.getNumberOfBucketsCreated());
+    Assert.assertEquals(1, freon.getNumberOfKeysAdded());
+    Assert.assertEquals(0, freon.getUnsuccessfulValidationCount());
+    Assert.assertEquals(0, res);
+  }
+
+  @Test
+  public void validateWriteTest() throws Exception {
+    PrintStream originalStream = System.out;
+    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+    System.setOut(new PrintStream(outStream));
+    List<String> args = new ArrayList<>();
+    args.add("-validateWrites");
+    args.add("-numOfVolumes");
+    args.add("2");
+    args.add("-numOfBuckets");
+    args.add("5");
+    args.add("-numOfKeys");
+    args.add("10");
+    Freon freon = new Freon(conf);
+    int res = ToolRunner.run(conf, freon,
+        args.toArray(new String[0]));
+    Assert.assertEquals(0, res);
+    Assert.assertEquals(2, freon.getNumberOfVolumesCreated());
+    Assert.assertEquals(10, freon.getNumberOfBucketsCreated());
+    Assert.assertEquals(100, freon.getNumberOfKeysAdded());
+    Assert.assertTrue(freon.getValidateWrites());
+    Assert.assertNotEquals(0, freon.getTotalKeysValidated());
+    Assert.assertNotEquals(0, freon.getSuccessfulValidationCount());
+    Assert.assertEquals(0, freon.getUnsuccessfulValidationCount());
+    System.setOut(originalStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8a8ee50/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestFreon.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestFreon.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestFreon.java
index c356ea3..d002e9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestFreon.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestFreon.java
@@ -29,9 +29,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -91,33 +89,6 @@ public class TestFreon {
   }
 
   @Test
-  public void validateWriteTest() throws Exception {
-    PrintStream originalStream = System.out;
-    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-    System.setOut(new PrintStream(outStream));
-    List<String> args = new ArrayList<>();
-    args.add("-validateWrites");
-    args.add("-numOfVolumes");
-    args.add("2");
-    args.add("-numOfBuckets");
-    args.add("5");
-    args.add("-numOfKeys");
-    args.add("10");
-    Freon freon = new Freon(conf);
-    int res = ToolRunner.run(conf, freon,
-        args.toArray(new String[0]));
-    Assert.assertEquals(0, res);
-    Assert.assertEquals(2, freon.getNumberOfVolumesCreated());
-    Assert.assertEquals(10, freon.getNumberOfBucketsCreated());
-    Assert.assertEquals(100, freon.getNumberOfKeysAdded());
-    Assert.assertTrue(freon.getValidateWrites());
-    Assert.assertNotEquals(0, freon.getTotalKeysValidated());
-    Assert.assertNotEquals(0, freon.getSuccessfulValidationCount());
-    Assert.assertEquals(0, freon.getUnsuccessfulValidationCount());
-    System.setOut(originalStream);
-  }
-
-  @Test
   public void multiThread() throws Exception {
     List<String> args = new ArrayList<>();
     args.add("-numOfVolumes");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8a8ee50/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index ac1d033..b4ffb97 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -100,7 +100,7 @@
     <ldap-api.version>1.0.0-M33</ldap-api.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.1.1-alpha-0f7169d-SNAPSHOT</ratis.version>
+    <ratis.version>0.1.1-alpha-8fd74ed-SNAPSHOT</ratis.version>
     <jcache.version>1.0-alpha-1</jcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <hikari.version>2.4.12</hikari.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org