You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/02/28 20:15:44 UTC
hadoop git commit: HDFS-11036. Ozone: reuse Xceiver connection.
Contributed by Chen Liang.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 ad16978e6 -> 00684d62c
HDFS-11036. Ozone: reuse Xceiver connection. Contributed by Chen Liang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/00684d62
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/00684d62
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/00684d62
Branch: refs/heads/HDFS-7240
Commit: 00684d62ca7e0f4be3eef05f9634ce4102b33ef3
Parents: ad16978
Author: Anu Engineer <ae...@apache.org>
Authored: Tue Feb 28 12:15:26 2017 -0800
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue Feb 28 12:15:26 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/scm/ScmConfigKeys.java | 5 +
.../apache/hadoop/scm/XceiverClientManager.java | 112 +++++++++++++++++--
2 files changed, 106 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00684d62/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 44414ea..1e7d994 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -30,6 +30,11 @@ public final class ScmConfigKeys {
"dfs.container.ipc";
public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
+ public static final String SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY =
+ "scm.container.client.idle.threshold";
+ public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT =
+ 10000;
+
// TODO : this is copied from OzoneConsts, may need to move to a better place
public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00684d62/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
index b9d7765..de706cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
@@ -19,27 +19,39 @@
package org.apache.hadoop.scm;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
+
/**
* XceiverClientManager is responsible for the lifecycle of XceiverClient
* instances. Callers use this class to acquire an XceiverClient instance
* connected to the desired container pipeline. When done, the caller also uses
* this class to release the previously acquired XceiverClient instance.
*
- * This class may evolve to implement efficient lifecycle management policies by
- * caching container location information and pooling connected client instances
- * for reuse without needing to reestablish a socket connection. The current
- * implementation simply allocates and closes a new instance every time.
+ *
+ * This class caches connection to container for reuse purpose, such that
+ * accessing same container frequently will be through the same connection
+ * without reestablishing connection. But the connection will be closed if
+ * not being used for a period of time.
*/
public class XceiverClientManager {
//TODO : change this to SCM configuration class
private final Configuration conf;
+ private Cache<String, XceiverClientWithAccessInfo> openClient;
+ private final long staleThresholdMs;
/**
* Creates a new XceiverClientManager.
@@ -48,13 +60,38 @@ public class XceiverClientManager {
*/
public XceiverClientManager(Configuration conf) {
Preconditions.checkNotNull(conf);
+ this.staleThresholdMs = conf.getTimeDuration(
+ SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
+ SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
this.conf = conf;
+ this.openClient = CacheBuilder.newBuilder()
+ .expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS)
+ .removalListener(
+ new RemovalListener<String, XceiverClientWithAccessInfo>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<String, XceiverClientWithAccessInfo>
+ removalNotification) {
+ // If the reference count is not 0, this xceiver client should not
+ // be evicted, add it back to the cache.
+ XceiverClientWithAccessInfo info = removalNotification.getValue();
+ if (info.hasRefence()) {
+ synchronized (XceiverClientManager.this.openClient) {
+ XceiverClientManager.this
+ .openClient.put(removalNotification.getKey(), info);
+ }
+ }
+ }
+ }).build();
}
/**
* Acquires a XceiverClient connected to a container capable of storing the
* specified key.
*
+ * If there is already a cached XceiverClient, simply return the cached
+ * otherwise create a new one.
+ *
* @param pipeline the container pipeline for the client connection
* @return XceiverClient connected to a container
* @throws IOException if an XceiverClient cannot be acquired
@@ -63,13 +100,28 @@ public class XceiverClientManager {
Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getMachines() != null);
Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
- XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
- try {
- xceiverClient.connect();
- } catch (Exception e) {
- throw new IOException("Exception connecting XceiverClient.", e);
+ String containerName = pipeline.getContainerName();
+ XceiverClientWithAccessInfo info = openClient.getIfPresent(containerName);
+
+ if (info != null) {
+ // we do have this connection, add reference and return
+ info.incrementReference();
+ return info.getXceiverClient();
+ } else {
+ // connection not found, create new, add reference and return
+ XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
+ try {
+ xceiverClient.connect();
+ } catch (Exception e) {
+ throw new IOException("Exception connecting XceiverClient.", e);
+ }
+ info = new XceiverClientWithAccessInfo(xceiverClient);
+ info.incrementReference();
+ synchronized (openClient) {
+ openClient.put(containerName, info);
+ }
+ return xceiverClient;
}
- return xceiverClient;
}
/**
@@ -79,6 +131,44 @@ public class XceiverClientManager {
*/
public void releaseClient(XceiverClient xceiverClient) {
Preconditions.checkNotNull(xceiverClient);
- xceiverClient.close();
+ String containerName = xceiverClient.getPipeline().getContainerName();
+ XceiverClientWithAccessInfo info;
+ synchronized (openClient) {
+ info = openClient.getIfPresent(containerName);
+ }
+ Preconditions.checkNotNull(info);
+ info.decrementReference();
+ }
+
+ /**
+ * A helper class for caching and cleaning XceiverClient. Three parameters:
+ * - the actual XceiverClient object
+ * - a time stamp representing the most recent access (acquire or release)
+ * - a reference count, +1 when acquire, -1 when release
+ */
+ private static class XceiverClientWithAccessInfo {
+ final private XceiverClient xceiverClient;
+ final private AtomicInteger referenceCount;
+
+ XceiverClientWithAccessInfo(XceiverClient xceiverClient) {
+ this.xceiverClient = xceiverClient;
+ this.referenceCount = new AtomicInteger(0);
+ }
+
+ void incrementReference() {
+ this.referenceCount.incrementAndGet();
+ }
+
+ void decrementReference() {
+ this.referenceCount.decrementAndGet();
+ }
+
+ boolean hasRefence() {
+ return this.referenceCount.get() != 0;
+ }
+
+ XceiverClient getXceiverClient() {
+ return xceiverClient;
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org