You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/12/18 22:37:40 UTC
hadoop git commit: HDFS-12890 . Ozone: XceiverClient should have
upper bound on async requests. Contributed by Shashikant Banerjee.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 65240bc28 -> cb74ddf5e
HDFS-12890 . Ozone: XceiverClient should have upper bound on async requests. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cb74ddf5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cb74ddf5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cb74ddf5
Branch: refs/heads/HDFS-7240
Commit: cb74ddf5e4f05cd27b5d55bc5099901dc2571edd
Parents: 65240bc
Author: Anu Engineer <ae...@apache.org>
Authored: Mon Dec 18 14:27:45 2017 -0800
Committer: Anu Engineer <ae...@apache.org>
Committed: Mon Dec 18 14:27:45 2017 -0800
----------------------------------------------------------------------
.../hadoop/ozone/client/OzoneClientUtils.java | 10 ++++++++
.../org/apache/hadoop/scm/ScmConfigKeys.java | 5 ++++
.../org/apache/hadoop/scm/XceiverClient.java | 25 ++++++++++++++++----
.../apache/hadoop/scm/XceiverClientHandler.java | 17 +++++++++++--
.../hadoop/scm/XceiverClientInitializer.java | 8 +++++--
.../apache/hadoop/scm/XceiverClientRatis.java | 13 ++++++++--
.../src/main/resources/ozone-default.xml | 10 ++++++++
7 files changed, 78 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
index 7381bc1..1830bdf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
@@ -861,4 +861,14 @@ public final class OzoneClientUtils {
return ZonedDateTime.parse(date, DATE_FORMAT.get())
.toInstant().getEpochSecond();
}
+
+ /**
+ * Returns the maximum no of outstanding async requests to be handled by
+ * Standalone and Ratis client.
+ */
+ public static int getMaxOutstandingRequests(Configuration config) {
+ return config
+ .getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS,
+ ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 410f3983..b79f72b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -37,6 +37,11 @@ public final class ScmConfigKeys {
public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT =
256;
+ public static final String SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS =
+ "scm.container.client.max.outstanding.requests";
+ public static final int SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT
+ = 100;
+
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= "dfs.container.ratis.enabled";
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
index 60f0998..bde9064 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.List;
+import java.util.concurrent.Semaphore;
/**
* A Client for the storageContainer protocol.
@@ -51,6 +53,7 @@ public class XceiverClient extends XceiverClientSpi {
private Channel channel;
private Bootstrap b;
private EventLoopGroup group;
+ private final Semaphore semaphore;
/**
* Constructs a client that can communicate with the Container framework on
@@ -65,6 +68,8 @@ public class XceiverClient extends XceiverClientSpi {
Preconditions.checkNotNull(config);
this.pipeline = pipeline;
this.config = config;
+ this.semaphore =
+ new Semaphore(OzoneClientUtils.getMaxOutstandingRequests(config));
}
@Override
@@ -78,7 +83,7 @@ public class XceiverClient extends XceiverClientSpi {
b.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
- .handler(new XceiverClientInitializer(this.pipeline));
+ .handler(new XceiverClientInitializer(this.pipeline, semaphore));
DatanodeID leader = this.pipeline.getLeader();
// read port from the data node, on failure use default configured
@@ -116,8 +121,7 @@ public class XceiverClient extends XceiverClientSpi {
@Override
public ContainerProtos.ContainerCommandResponseProto sendCommand(
- ContainerProtos.ContainerCommandRequestProto request)
- throws IOException {
+ ContainerProtos.ContainerCommandRequestProto request) throws IOException {
try {
if ((channel == null) || (!channel.isActive())) {
throw new IOException("This channel is not connected.");
@@ -127,7 +131,20 @@ public class XceiverClient extends XceiverClientSpi {
return handler.sendCommand(request);
} catch (ExecutionException | InterruptedException e) {
- throw new IOException("Unexpected exception during execution", e);
+ /**
+ * In case the netty channel handler throws an exception,
+ * the exception thrown will be wrapped within {@link ExecutionException}.
+ * Unwarpping here so that original exception gets passed
+ * to to the client.
+ */
+ if (e instanceof ExecutionException) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+ }
+ throw new IOException(
+ "Unexpected exception during execution:" + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
index 1d91b14..ac2cf1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
@@ -32,11 +32,13 @@ import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
/**
* Netty client handler.
@@ -51,15 +53,17 @@ public class XceiverClientHandler extends
private final Pipeline pipeline;
private volatile Channel channel;
private XceiverClientMetrics metrics;
+ private final Semaphore semaphore;
/**
* Constructs a client that can communicate to a container server.
*/
- public XceiverClientHandler(Pipeline pipeline) {
+ public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) {
super(false);
Preconditions.checkNotNull(pipeline);
this.pipeline = pipeline;
this.metrics = XceiverClientManager.getXceiverClientMetrics();
+ this.semaphore = semaphore;
}
/**
@@ -83,6 +87,7 @@ public class XceiverClientHandler extends
String key = msg.getTraceID();
ResponseFuture response = responses.remove(key);
+ semaphore.release();
if (response != null) {
response.getFuture().complete(msg);
@@ -105,6 +110,12 @@ public class XceiverClientHandler extends
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.info("Exception in client " + cause.toString());
+ Iterator<String> keyIterator = responses.keySet().iterator();
+ while (keyIterator.hasNext()) {
+ ResponseFuture response = responses.remove(keyIterator.next());
+ response.getFuture().completeExceptionally(cause);
+ semaphore.release();
+ }
ctx.close();
}
@@ -133,7 +144,8 @@ public class XceiverClientHandler extends
* @return CompletableFuture
*/
public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
- ContainerProtos.ContainerCommandRequestProto request) {
+ ContainerProtos.ContainerCommandRequestProto request)
+ throws InterruptedException {
// Throw an exception of request doesn't have traceId
if (StringUtils.isEmpty(request.getTraceID())) {
@@ -152,6 +164,7 @@ public class XceiverClientHandler extends
= new CompletableFuture<>();
ResponseFuture response = new ResponseFuture(future,
Time.monotonicNowNanos());
+ semaphore.acquire();
ResponseFuture previous = responses.putIfAbsent(
request.getTraceID(), response);
if (previous != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
index fbfb7ca..6aac960 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
@@ -27,19 +27,23 @@ import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import java.util.concurrent.Semaphore;
+
/**
* Setup the netty pipeline.
*/
public class XceiverClientInitializer extends
ChannelInitializer<SocketChannel> {
private final Pipeline pipeline;
+ private final Semaphore semaphore;
/**
* Constructs an Initializer for the client pipeline.
* @param pipeline - Pipeline.
*/
- public XceiverClientInitializer(Pipeline pipeline) {
+ public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) {
this.pipeline = pipeline;
+ this.semaphore = semaphore;
}
/**
@@ -62,7 +66,7 @@ public class XceiverClientInitializer extends
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
- p.addLast(new XceiverClientHandler(this.pipeline));
+ p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
index 5e171b1..12ee328 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.RatisHelper;
@@ -56,19 +57,24 @@ public final class XceiverClientRatis extends XceiverClientSpi {
final String rpcType = ozoneConf.get(
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+ final int maxOutstandingRequests =
+ OzoneClientUtils.getMaxOutstandingRequests(ozoneConf);
return new XceiverClientRatis(pipeline,
- SupportedRpcType.valueOfIgnoreCase(rpcType));
+ SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests);
}
private final Pipeline pipeline;
private final RpcType rpcType;
private final AtomicReference<RaftClient> client = new AtomicReference<>();
+ private final int maxOutstandingRequests;
/** Constructs a client. */
- private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) {
+ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
+ int maxOutStandingChunks) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
+ this.maxOutstandingRequests = maxOutStandingChunks;
}
/**
@@ -147,6 +153,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
LOG.debug("Connecting to pipeline:{} leader:{}",
getPipeline().getPipelineName(),
RatisHelper.toRaftPeerId(pipeline.getLeader()));
+ // TODO : XceiverClient ratis should pass the config value of
+ // maxOutstandingRequests so as to set the upper bound on max no of async
+ // requests to be handled by raft client
if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline()))) {
throw new IllegalStateException("Client is already connected.");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb74ddf5/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index 8c248ea..1c8d086 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -1097,6 +1097,16 @@
</property>
<property>
+ <name>scm.container.client.max.outstanding.requests</name>
+ <value>100</value>
+ <tag>OZONE, PERFORMANCE</tag>
+ <description>
+ Controls the maximum number of outstanding async requests that can be
+ handled by the Standalone as well as Ratis client.
+ </description>
+ </property>
+
+ <property>
<name>ozone.scm.container.creation.lease.timeout</name>
<value>60s</value>
<tag>OZONE, SCM</tag>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org