You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/02/28 20:15:44 UTC

hadoop git commit: HDFS-11036. Ozone: reuse Xceiver connection. Contributed by Chen Liang.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 ad16978e6 -> 00684d62c


HDFS-11036. Ozone: reuse Xceiver connection. Contributed by Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/00684d62
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/00684d62
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/00684d62

Branch: refs/heads/HDFS-7240
Commit: 00684d62ca7e0f4be3eef05f9634ce4102b33ef3
Parents: ad16978
Author: Anu Engineer <ae...@apache.org>
Authored: Tue Feb 28 12:15:26 2017 -0800
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue Feb 28 12:15:26 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/scm/ScmConfigKeys.java    |   5 +
 .../apache/hadoop/scm/XceiverClientManager.java | 112 +++++++++++++++++--
 2 files changed, 106 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/00684d62/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 44414ea..1e7d994 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -30,6 +30,11 @@ public final class ScmConfigKeys {
       "dfs.container.ipc";
   public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
 
+  public static final String SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY =
+      "scm.container.client.idle.threshold";
+  public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT =
+      10000;
+
   // TODO : this is copied from OzoneConsts, may need to move to a better place
   public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00684d62/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
index b9d7765..de706cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
@@ -19,27 +19,39 @@
 package org.apache.hadoop.scm;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Preconditions;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
+import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
+
 /**
  * XceiverClientManager is responsible for the lifecycle of XceiverClient
  * instances.  Callers use this class to acquire an XceiverClient instance
  * connected to the desired container pipeline.  When done, the caller also uses
  * this class to release the previously acquired XceiverClient instance.
  *
- * This class may evolve to implement efficient lifecycle management policies by
- * caching container location information and pooling connected client instances
- * for reuse without needing to reestablish a socket connection.  The current
- * implementation simply allocates and closes a new instance every time.
+ *
+ * This class caches connection to container for reuse purpose, such that
+ * accessing same container frequently will be through the same connection
+ * without reestablishing connection. But the connection will be closed if
+ * not being used for a period of time.
  */
 public class XceiverClientManager {
 
   //TODO : change this to SCM configuration class
   private final Configuration conf;
+  private Cache<String, XceiverClientWithAccessInfo> openClient;
+  private final long staleThresholdMs;
 
   /**
    * Creates a new XceiverClientManager.
@@ -48,13 +60,38 @@ public class XceiverClientManager {
    */
   public XceiverClientManager(Configuration conf) {
     Preconditions.checkNotNull(conf);
+    this.staleThresholdMs = conf.getTimeDuration(
+        SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
+        SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
     this.conf = conf;
+    this.openClient = CacheBuilder.newBuilder()
+        .expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS)
+        .removalListener(
+            new RemovalListener<String, XceiverClientWithAccessInfo>() {
+            @Override
+            public void onRemoval(
+                RemovalNotification<String, XceiverClientWithAccessInfo>
+                  removalNotification) {
+              // If the reference count is not 0, this xceiver client should not
+              // be evicted, add it back to the cache.
+              XceiverClientWithAccessInfo info = removalNotification.getValue();
+              if (info.hasRefence()) {
+                synchronized (XceiverClientManager.this.openClient) {
+                  XceiverClientManager.this
+                      .openClient.put(removalNotification.getKey(), info);
+                }
+              }
+            }
+          }).build();
   }
 
   /**
    * Acquires a XceiverClient connected to a container capable of storing the
    * specified key.
    *
+   * If there is already a cached XceiverClient, simply return the cached
+   * otherwise create a new one.
+   *
    * @param pipeline the container pipeline for the client connection
    * @return XceiverClient connected to a container
    * @throws IOException if an XceiverClient cannot be acquired
@@ -63,13 +100,28 @@ public class XceiverClientManager {
     Preconditions.checkNotNull(pipeline);
     Preconditions.checkArgument(pipeline.getMachines() != null);
     Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
-    XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
-    try {
-      xceiverClient.connect();
-    } catch (Exception e) {
-      throw new IOException("Exception connecting XceiverClient.", e);
+    String containerName = pipeline.getContainerName();
+    XceiverClientWithAccessInfo info = openClient.getIfPresent(containerName);
+
+    if (info != null) {
+      // we do have this connection, add reference and return
+      info.incrementReference();
+      return info.getXceiverClient();
+    } else {
+      // connection not found, create new, add reference and return
+      XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
+      try {
+        xceiverClient.connect();
+      } catch (Exception e) {
+        throw new IOException("Exception connecting XceiverClient.", e);
+      }
+      info = new XceiverClientWithAccessInfo(xceiverClient);
+      info.incrementReference();
+      synchronized (openClient) {
+        openClient.put(containerName, info);
+      }
+      return xceiverClient;
     }
-    return xceiverClient;
   }
 
   /**
@@ -79,6 +131,44 @@ public class XceiverClientManager {
    */
   public void releaseClient(XceiverClient xceiverClient) {
     Preconditions.checkNotNull(xceiverClient);
-    xceiverClient.close();
+    String containerName = xceiverClient.getPipeline().getContainerName();
+    XceiverClientWithAccessInfo info;
+    synchronized (openClient) {
+      info = openClient.getIfPresent(containerName);
+    }
+    Preconditions.checkNotNull(info);
+    info.decrementReference();
+  }
+
+  /**
+   * A helper class for caching and cleaning XceiverClient. Three parameters:
+   * - the actual XceiverClient object
+   * - a time stamp representing the most recent access (acquire or release)
+   * - a reference count, +1 when acquire, -1 when release
+   */
+  private static class XceiverClientWithAccessInfo {
+    final private XceiverClient xceiverClient;
+    final private AtomicInteger referenceCount;
+
+    XceiverClientWithAccessInfo(XceiverClient xceiverClient) {
+      this.xceiverClient = xceiverClient;
+      this.referenceCount = new AtomicInteger(0);
+    }
+
+    void incrementReference() {
+      this.referenceCount.incrementAndGet();
+    }
+
+    void decrementReference() {
+      this.referenceCount.decrementAndGet();
+    }
+
+    boolean hasRefence() {
+      return this.referenceCount.get() != 0;
+    }
+
+    XceiverClient getXceiverClient() {
+      return xceiverClient;
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org