You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/12/18 22:37:40 UTC

hadoop git commit: HDFS-12890 . Ozone: XceiverClient should have upper bound on async requests. Contributed by Shashikant Banerjee.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 65240bc28 -> cb74ddf5e


HDFS-12890 . Ozone: XceiverClient should have upper bound on async requests. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cb74ddf5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cb74ddf5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cb74ddf5

Branch: refs/heads/HDFS-7240
Commit: cb74ddf5e4f05cd27b5d55bc5099901dc2571edd
Parents: 65240bc
Author: Anu Engineer <ae...@apache.org>
Authored: Mon Dec 18 14:27:45 2017 -0800
Committer: Anu Engineer <ae...@apache.org>
Committed: Mon Dec 18 14:27:45 2017 -0800

----------------------------------------------------------------------
 .../hadoop/ozone/client/OzoneClientUtils.java   | 10 ++++++++
 .../org/apache/hadoop/scm/ScmConfigKeys.java    |  5 ++++
 .../org/apache/hadoop/scm/XceiverClient.java    | 25 ++++++++++++++++----
 .../apache/hadoop/scm/XceiverClientHandler.java | 17 +++++++++++--
 .../hadoop/scm/XceiverClientInitializer.java    |  8 +++++--
 .../apache/hadoop/scm/XceiverClientRatis.java   | 13 ++++++++--
 .../src/main/resources/ozone-default.xml        | 10 ++++++++
 7 files changed, 78 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
index 7381bc1..1830bdf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
@@ -861,4 +861,14 @@ public final class OzoneClientUtils {
     return ZonedDateTime.parse(date, DATE_FORMAT.get())
         .toInstant().getEpochSecond();
   }
+
+  /**
+   * Returns the maximum no of outstanding async requests to be handled by
+   * Standalone and Ratis client.
+   */
+  public static int getMaxOutstandingRequests(Configuration config) {
+    return config
+        .getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS,
+            ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 410f3983..b79f72b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -37,6 +37,11 @@ public final class ScmConfigKeys {
   public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT =
       256;
 
+  public static final String SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS =
+      "scm.container.client.max.outstanding.requests";
+  public static final int SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT
+      = 100;
+
   public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
       = "dfs.container.ratis.enabled";
   public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
index 60f0998..bde9064 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.List;
+import java.util.concurrent.Semaphore;
 
 /**
  * A Client for the storageContainer protocol.
@@ -51,6 +53,7 @@ public class XceiverClient extends XceiverClientSpi {
   private Channel channel;
   private Bootstrap b;
   private EventLoopGroup group;
+  private final Semaphore semaphore;
 
   /**
    * Constructs a client that can communicate with the Container framework on
@@ -65,6 +68,8 @@ public class XceiverClient extends XceiverClientSpi {
     Preconditions.checkNotNull(config);
     this.pipeline = pipeline;
     this.config = config;
+    this.semaphore =
+        new Semaphore(OzoneClientUtils.getMaxOutstandingRequests(config));
   }
 
   @Override
@@ -78,7 +83,7 @@ public class XceiverClient extends XceiverClientSpi {
     b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
-        .handler(new XceiverClientInitializer(this.pipeline));
+        .handler(new XceiverClientInitializer(this.pipeline, semaphore));
     DatanodeID leader = this.pipeline.getLeader();
 
     // read port from the data node, on failure use default configured
@@ -116,8 +121,7 @@ public class XceiverClient extends XceiverClientSpi {
 
   @Override
   public ContainerProtos.ContainerCommandResponseProto sendCommand(
-      ContainerProtos.ContainerCommandRequestProto request)
-      throws IOException {
+      ContainerProtos.ContainerCommandRequestProto request) throws IOException {
     try {
       if ((channel == null) || (!channel.isActive())) {
         throw new IOException("This channel is not connected.");
@@ -127,7 +131,20 @@ public class XceiverClient extends XceiverClientSpi {
 
       return handler.sendCommand(request);
     } catch (ExecutionException | InterruptedException e) {
-      throw new IOException("Unexpected exception during execution", e);
+      /**
+       * In case the netty channel handler throws an exception,
+       * the exception thrown will be wrapped within {@link ExecutionException}.
+       * Unwarpping here so that original exception gets passed
+       * to to the client.
+       */
+      if (e instanceof ExecutionException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof IOException) {
+          throw (IOException) cause;
+        }
+      }
+      throw new IOException(
+          "Unexpected exception during execution:" + e.getMessage());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
index 1d91b14..ac2cf1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
@@ -32,11 +32,13 @@ import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 
 /**
  * Netty client handler.
@@ -51,15 +53,17 @@ public class XceiverClientHandler extends
   private final Pipeline pipeline;
   private volatile Channel channel;
   private XceiverClientMetrics metrics;
+  private final Semaphore semaphore;
 
   /**
    * Constructs a client that can communicate to a container server.
    */
-  public XceiverClientHandler(Pipeline pipeline) {
+  public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) {
     super(false);
     Preconditions.checkNotNull(pipeline);
     this.pipeline = pipeline;
     this.metrics = XceiverClientManager.getXceiverClientMetrics();
+    this.semaphore = semaphore;
   }
 
   /**
@@ -83,6 +87,7 @@ public class XceiverClientHandler extends
 
     String key = msg.getTraceID();
     ResponseFuture response = responses.remove(key);
+    semaphore.release();
 
     if (response != null) {
       response.getFuture().complete(msg);
@@ -105,6 +110,12 @@ public class XceiverClientHandler extends
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
     LOG.info("Exception in client " + cause.toString());
+    Iterator<String> keyIterator = responses.keySet().iterator();
+    while (keyIterator.hasNext()) {
+      ResponseFuture response = responses.remove(keyIterator.next());
+      response.getFuture().completeExceptionally(cause);
+      semaphore.release();
+    }
     ctx.close();
   }
 
@@ -133,7 +144,8 @@ public class XceiverClientHandler extends
    * @return CompletableFuture
    */
   public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
-      ContainerProtos.ContainerCommandRequestProto request) {
+      ContainerProtos.ContainerCommandRequestProto request)
+      throws InterruptedException {
 
     // Throw an exception of request doesn't have traceId
     if (StringUtils.isEmpty(request.getTraceID())) {
@@ -152,6 +164,7 @@ public class XceiverClientHandler extends
         = new CompletableFuture<>();
     ResponseFuture response = new ResponseFuture(future,
         Time.monotonicNowNanos());
+    semaphore.acquire();
     ResponseFuture previous = responses.putIfAbsent(
         request.getTraceID(), response);
     if (previous != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
index fbfb7ca..6aac960 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
@@ -27,19 +27,23 @@ import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
+import java.util.concurrent.Semaphore;
+
 /**
  * Setup the netty pipeline.
  */
 public class XceiverClientInitializer extends
     ChannelInitializer<SocketChannel> {
   private final Pipeline pipeline;
+  private final Semaphore semaphore;
 
   /**
    * Constructs an Initializer for the client pipeline.
    * @param pipeline  - Pipeline.
    */
-  public XceiverClientInitializer(Pipeline pipeline) {
+  public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) {
     this.pipeline = pipeline;
+    this.semaphore = semaphore;
   }
 
   /**
@@ -62,7 +66,7 @@ public class XceiverClientInitializer extends
     p.addLast(new ProtobufVarint32LengthFieldPrepender());
     p.addLast(new ProtobufEncoder());
 
-    p.addLast(new XceiverClientHandler(this.pipeline));
+    p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore));
 
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
index 5e171b1..12ee328 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.ratis.RatisHelper;
@@ -56,19 +57,24 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     final String rpcType = ozoneConf.get(
         ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
         ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final int maxOutstandingRequests =
+        OzoneClientUtils.getMaxOutstandingRequests(ozoneConf);
     return new XceiverClientRatis(pipeline,
-        SupportedRpcType.valueOfIgnoreCase(rpcType));
+        SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests);
   }
 
   private final Pipeline pipeline;
   private final RpcType rpcType;
   private final AtomicReference<RaftClient> client = new AtomicReference<>();
+  private final int maxOutstandingRequests;
 
   /** Constructs a client. */
-  private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) {
+  private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
+      int maxOutStandingChunks) {
     super();
     this.pipeline = pipeline;
     this.rpcType = rpcType;
+    this.maxOutstandingRequests = maxOutStandingChunks;
   }
 
   /**
@@ -147,6 +153,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     LOG.debug("Connecting to pipeline:{} leader:{}",
         getPipeline().getPipelineName(),
         RatisHelper.toRaftPeerId(pipeline.getLeader()));
+    // TODO : XceiverClient ratis should pass the config value of
+    // maxOutstandingRequests so as to set the upper bound on max no of async
+    // requests to be handled by raft client
     if (!client.compareAndSet(null,
         RatisHelper.newRaftClient(rpcType, getPipeline()))) {
       throw new IllegalStateException("Client is already connected.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index 8c248ea..1c8d086 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -1097,6 +1097,16 @@
   </property>
 
   <property>
+    <name>scm.container.client.max.outstanding.requests</name>
+    <value>100</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>
+      Controls the maximum number of outstanding async requests that can be
+      handled by the Standalone as well as Ratis client.
+    </description>
+  </property>
+
+  <property>
     <name>ozone.scm.container.creation.lease.timeout</name>
     <value>60s</value>
     <tag>OZONE, SCM</tag>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org