You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/12 20:03:20 UTC
[35/50] [abbrv] hadoop git commit: HDFS-11546. Federation Router RPC
server. Contributed by Jason Kace and Inigo Goiri.
HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/48850c41
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/48850c41
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/48850c41
Branch: refs/heads/HDFS-10467
Commit: 48850c41710457332296ce4450c748f5218882d3
Parents: 3c88198
Author: Inigo Goiri <in...@apache.org>
Authored: Thu May 11 09:57:03 2017 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Tue Sep 12 13:02:19 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 38 +
.../resolver/FederationNamespaceInfo.java | 46 +-
.../federation/resolver/RemoteLocation.java | 46 +-
.../federation/router/ConnectionContext.java | 104 +
.../federation/router/ConnectionManager.java | 408 ++++
.../federation/router/ConnectionPool.java | 314 +++
.../federation/router/ConnectionPoolId.java | 117 ++
.../router/RemoteLocationContext.java | 38 +-
.../server/federation/router/RemoteMethod.java | 164 ++
.../server/federation/router/RemoteParam.java | 71 +
.../hdfs/server/federation/router/Router.java | 58 +-
.../federation/router/RouterRpcClient.java | 856 ++++++++
.../federation/router/RouterRpcServer.java | 1867 +++++++++++++++++-
.../src/main/resources/hdfs-default.xml | 95 +
.../server/federation/FederationTestUtils.java | 80 +-
.../hdfs/server/federation/MockResolver.java | 90 +-
.../server/federation/RouterConfigBuilder.java | 20 +-
.../server/federation/RouterDFSCluster.java | 535 +++--
.../server/federation/router/TestRouter.java | 31 +-
.../server/federation/router/TestRouterRpc.java | 869 ++++++++
.../router/TestRouterRpcMultiDestination.java | 216 ++
21 files changed, 5675 insertions(+), 388 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index bb3087c..341a502 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1120,6 +1120,44 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// HDFS Router-based federation
public static final String FEDERATION_ROUTER_PREFIX =
"dfs.federation.router.";
+ public static final String DFS_ROUTER_DEFAULT_NAMESERVICE =
+ FEDERATION_ROUTER_PREFIX + "default.nameserviceId";
+ public static final String DFS_ROUTER_HANDLER_COUNT_KEY =
+ FEDERATION_ROUTER_PREFIX + "handler.count";
+ public static final int DFS_ROUTER_HANDLER_COUNT_DEFAULT = 10;
+ public static final String DFS_ROUTER_READER_QUEUE_SIZE_KEY =
+ FEDERATION_ROUTER_PREFIX + "reader.queue.size";
+ public static final int DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT = 100;
+ public static final String DFS_ROUTER_READER_COUNT_KEY =
+ FEDERATION_ROUTER_PREFIX + "reader.count";
+ public static final int DFS_ROUTER_READER_COUNT_DEFAULT = 1;
+ public static final String DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY =
+ FEDERATION_ROUTER_PREFIX + "handler.queue.size";
+ public static final int DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
+ public static final String DFS_ROUTER_RPC_BIND_HOST_KEY =
+ FEDERATION_ROUTER_PREFIX + "rpc-bind-host";
+ public static final int DFS_ROUTER_RPC_PORT_DEFAULT = 8888;
+ public static final String DFS_ROUTER_RPC_ADDRESS_KEY =
+ FEDERATION_ROUTER_PREFIX + "rpc-address";
+ public static final String DFS_ROUTER_RPC_ADDRESS_DEFAULT =
+ "0.0.0.0:" + DFS_ROUTER_RPC_PORT_DEFAULT;
+ public static final String DFS_ROUTER_RPC_ENABLE =
+ FEDERATION_ROUTER_PREFIX + "rpc.enable";
+ public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
+
+ // HDFS Router NN client
+ public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
+ FEDERATION_ROUTER_PREFIX + "connection.pool-size";
+ public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT =
+ 64;
+ public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN =
+ FEDERATION_ROUTER_PREFIX + "connection.pool.clean.ms";
+ public static final long DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT =
+ TimeUnit.MINUTES.toMillis(1);
+ public static final String DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS =
+ FEDERATION_ROUTER_PREFIX + "connection.clean.ms";
+ public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
+ TimeUnit.SECONDS.toMillis(10);
// HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java
index bbaeca3..edcd308 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java
@@ -23,15 +23,14 @@ import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
* Represents information about a single nameservice/namespace in a federated
* HDFS cluster.
*/
-public class FederationNamespaceInfo
- implements Comparable<FederationNamespaceInfo>, RemoteLocationContext {
+public class FederationNamespaceInfo extends RemoteLocationContext {
/** Block pool identifier. */
- private String blockPoolId;
+ private final String blockPoolId;
/** Cluster identifier. */
- private String clusterId;
+ private final String clusterId;
/** Nameservice identifier. */
- private String nameserviceId;
+ private final String nameserviceId;
public FederationNamespaceInfo(String bpId, String clId, String nsId) {
this.blockPoolId = bpId;
@@ -39,15 +38,16 @@ public class FederationNamespaceInfo
this.nameserviceId = nsId;
}
- /**
- * The HDFS nameservice id for this namespace.
- *
- * @return Nameservice identifier.
- */
+ @Override
public String getNameserviceId() {
return this.nameserviceId;
}
+ @Override
+ public String getDest() {
+ return this.nameserviceId;
+ }
+
/**
* The HDFS cluster id for this namespace.
*
@@ -67,33 +67,7 @@ public class FederationNamespaceInfo
}
@Override
- public int hashCode() {
- return this.nameserviceId.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- } else if (obj instanceof FederationNamespaceInfo) {
- return this.compareTo((FederationNamespaceInfo) obj) == 0;
- } else {
- return false;
- }
- }
-
- @Override
- public int compareTo(FederationNamespaceInfo info) {
- return this.nameserviceId.compareTo(info.getNameserviceId());
- }
-
- @Override
public String toString() {
return this.nameserviceId + "->" + this.blockPoolId + ":" + this.clusterId;
}
-
- @Override
- public String getDest() {
- return this.nameserviceId;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
index eef136d..6aa12ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
@@ -17,34 +17,51 @@
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
-import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
/**
* A single in a remote namespace consisting of a nameservice ID
* and a HDFS path.
*/
-public class RemoteLocation implements RemoteLocationContext {
+public class RemoteLocation extends RemoteLocationContext {
/** Identifier of the remote namespace for this location. */
- private String nameserviceId;
+ private final String nameserviceId;
+ /** Identifier of the namenode in the namespace for this location. */
+ private final String namenodeId;
/** Path in the remote location. */
- private String path;
+ private final String path;
/**
* Create a new remote location.
*
+ * @param nsId
+ * @param pPath
+ */
+ public RemoteLocation(String nsId, String pPath) {
+ this(nsId, null, pPath);
+ }
+
+ /**
+ * Create a new remote location pointing to a particular namenode in the
+ * namespace.
+ *
* @param nsId Destination namespace.
* @param pPath Path in the destination namespace.
*/
- public RemoteLocation(String nsId, String pPath) {
+ public RemoteLocation(String nsId, String nnId, String pPath) {
this.nameserviceId = nsId;
+ this.namenodeId = nnId;
this.path = pPath;
}
@Override
public String getNameserviceId() {
- return this.nameserviceId;
+ String ret = this.nameserviceId;
+ if (this.namenodeId != null) {
+ ret += "-" + this.namenodeId;
+ }
+ return ret;
}
@Override
@@ -54,21 +71,6 @@ public class RemoteLocation implements RemoteLocationContext {
@Override
public String toString() {
- return this.nameserviceId + "->" + this.path;
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder(17, 31)
- .append(this.nameserviceId)
- .append(this.path)
- .toHashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- return (obj != null &&
- obj.getClass() == this.getClass() &&
- obj.hashCode() == this.hashCode());
+ return getNameserviceId() + "->" + this.path;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
new file mode 100644
index 0000000..1d27b51
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.ipc.RPC;
+
+/**
+ * Context to track a connection in a {@link ConnectionPool}. When a client uses
+ * a connection, it increments a counter to mark it as active. Once the client
+ * is done with the connection, it decreases the counter. It also takes care of
+ * closing the connection once is not active.
+ */
+public class ConnectionContext {
+
+ /** Client for the connection. */
+ private final ProxyAndInfo<ClientProtocol> client;
+ /** How many threads are using this connection. */
+ private int numThreads = 0;
+ /** If the connection is closed. */
+ private boolean closed = false;
+
+
+ public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) {
+ this.client = connection;
+ }
+
+ /**
+ * Check if the connection is active.
+ *
+ * @return True if the connection is active.
+ */
+ public synchronized boolean isActive() {
+ return this.numThreads > 0;
+ }
+
+ /**
+ * Check if the connection is closed.
+ *
+ * @return If the connection is closed.
+ */
+ public synchronized boolean isClosed() {
+ return this.closed;
+ }
+
+ /**
+ * Check if the connection can be used. It checks if the connection is used by
+ * another thread or already closed.
+ *
+ * @return True if the connection can be used.
+ */
+ public synchronized boolean isUsable() {
+ return !isActive() && !isClosed();
+ }
+
+ /**
+ * Get the connection client.
+ *
+ * @return Connection client.
+ */
+ public synchronized ProxyAndInfo<ClientProtocol> getClient() {
+ this.numThreads++;
+ return this.client;
+ }
+
+ /**
+ * Release this connection. If the connection was closed, close the proxy.
+ * Otherwise, mark the connection as not used by us anymore.
+ */
+ public synchronized void release() {
+ if (--this.numThreads == 0 && this.closed) {
+ close();
+ }
+ }
+
+ /**
+ * We will not use this connection anymore. If it's not being used, we close
+ * it. Otherwise, we let release() do it once we are done with it.
+ */
+ public synchronized void close() {
+ this.closed = true;
+ if (this.numThreads == 0) {
+ ClientProtocol proxy = this.client.getProxy();
+ // Nobody should be using this anymore so it should close right away
+ RPC.stopProxy(proxy);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
new file mode 100644
index 0000000..d93d498
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a pool of connections for the {@link Router} to be able to open
+ * many connections to many Namenodes.
+ */
+public class ConnectionManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ConnectionManager.class);
+
+ /** Number of parallel new connections to create. */
+ protected static final int MAX_NEW_CONNECTIONS = 100;
+
+ /** Minimum amount of active connections: 50%. */
+ protected static final float MIN_ACTIVE_RATIO = 0.5f;
+
+
+ /** Configuration for the connection manager, pool and sockets. */
+ private final Configuration conf;
+
+ /** Min number of connections per user + nn. */
+ private final int minSize = 1;
+ /** Max number of connections per user + nn. */
+ private final int maxSize;
+
+ /** How often we close a pool for a particular user + nn. */
+ private final long poolCleanupPeriodMs;
+ /** How often we close a connection in a pool. */
+ private final long connectionCleanupPeriodMs;
+
+ /** Map of connection pools, one pool per user + NN. */
+ private final Map<ConnectionPoolId, ConnectionPool> pools;
+ /** Lock for accessing pools. */
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Lock readLock = readWriteLock.readLock();
+ private final Lock writeLock = readWriteLock.writeLock();
+
+ /** Queue for creating new connections. */
+ private final BlockingQueue<ConnectionPool> creatorQueue =
+ new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS);
+ /** Create new connections asynchronously. */
+ private final ConnectionCreator creator;
+ /** Periodic executor to remove stale connection pools. */
+ private final ScheduledThreadPoolExecutor cleaner =
+ new ScheduledThreadPoolExecutor(1);
+
+ /** If the connection manager is running. */
+ private boolean running = false;
+
+
+ /**
+ * Creates a proxy client connection pool manager.
+ *
+ * @param config Configuration for the connections.
+ * @param minPoolSize Min size of the connection pool.
+ * @param maxPoolSize Max size of the connection pool.
+ */
+ public ConnectionManager(Configuration config) {
+ this.conf = config;
+
+ // Configure minimum and maximum connection pools
+ this.maxSize = this.conf.getInt(
+ DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
+ DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT);
+
+ // Map with the connections indexed by UGI and Namenode
+ this.pools = new HashMap<>();
+
+ // Create connections in a thread asynchronously
+ this.creator = new ConnectionCreator(creatorQueue);
+ this.creator.setDaemon(true);
+
+ // Cleanup periods
+ this.poolCleanupPeriodMs = this.conf.getLong(
+ DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN,
+ DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT);
+ LOG.info("Cleaning connection pools every {} seconds",
+ TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs));
+ this.connectionCleanupPeriodMs = this.conf.getLong(
+ DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS,
+ DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT);
+ LOG.info("Cleaning connections every {} seconds",
+ TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs));
+ }
+
+ /**
+ * Start the connection manager.
+ */
+ public void start() {
+ // Start the thread that creates connections asynchronously
+ this.creator.start();
+
+ // Schedule a task to remove stale connection pools and sockets
+ long recyleTimeMs = Math.min(
+ poolCleanupPeriodMs, connectionCleanupPeriodMs);
+ LOG.info("Cleaning every {} seconds",
+ TimeUnit.MILLISECONDS.toSeconds(recyleTimeMs));
+ this.cleaner.scheduleAtFixedRate(
+ new CleanupTask(), 0, recyleTimeMs, TimeUnit.MILLISECONDS);
+
+ // Mark the manager as running
+ this.running = true;
+ }
+
+ /**
+ * Stop the connection manager by closing all the pools.
+ */
+ public void close() {
+ this.creator.shutdown();
+ this.cleaner.shutdown();
+ this.running = false;
+
+ writeLock.lock();
+ try {
+ for (ConnectionPool pool : this.pools.values()) {
+ pool.close();
+ }
+ this.pools.clear();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Fetches the next available proxy client in the pool. Each client connection
+ * is reserved for a single user and cannot be reused until free.
+ *
+ * @param ugi User group information.
+ * @param nnAddress Namenode address for the connection.
+ * @return Proxy client to connect to nnId as UGI.
+ * @throws IOException If the connection cannot be obtained.
+ */
+ public ConnectionContext getConnection(
+ UserGroupInformation ugi, String nnAddress) throws IOException {
+
+ // Check if the manager is shutdown
+ if (!this.running) {
+ LOG.error(
+ "Cannot get a connection to {} because the manager isn't running",
+ nnAddress);
+ return null;
+ }
+
+ // Try to get the pool if created
+ ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress);
+ ConnectionPool pool = null;
+ readLock.lock();
+ try {
+ pool = this.pools.get(connectionId);
+ } finally {
+ readLock.unlock();
+ }
+
+ // Create the pool if not created before
+ if (pool == null) {
+ writeLock.lock();
+ try {
+ pool = this.pools.get(connectionId);
+ if (pool == null) {
+ pool = new ConnectionPool(
+ this.conf, nnAddress, ugi, this.minSize, this.maxSize);
+ this.pools.put(connectionId, pool);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ ConnectionContext conn = pool.getConnection();
+
+ // Add a new connection to the pool if it wasn't usable
+ if (conn == null || !conn.isUsable()) {
+ if (!this.creatorQueue.offer(pool)) {
+ LOG.error("Cannot add more than {} connections at the same time",
+ MAX_NEW_CONNECTIONS);
+ }
+ }
+
+ if (conn != null && conn.isClosed()) {
+ LOG.error("We got a closed connection from {}", pool);
+ conn = null;
+ }
+
+ return conn;
+ }
+
+ /**
+ * Get the number of connection pools.
+ *
+ * @return Number of connection pools.
+ */
+ public int getNumConnectionPools() {
+ readLock.lock();
+ try {
+ return pools.size();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get number of open connections.
+ *
+ * @return Number of open connections.
+ */
+ public int getNumConnections() {
+ int total = 0;
+ readLock.lock();
+ try {
+ for (ConnectionPool pool : this.pools.values()) {
+ total += pool.getNumConnections();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return total;
+ }
+
+ /**
+ * Get number of active connections.
+ *
+ * @return Number of active connections.
+ */
+ public int getNumActiveConnections() {
+ int total = 0;
+ readLock.lock();
+ try {
+ for (ConnectionPool pool : this.pools.values()) {
+ total += pool.getNumActiveConnections();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return total;
+ }
+
+ /**
+ * Get the number of connections to be created.
+ *
+ * @return Number of connections to be created.
+ */
+ public int getNumCreatingConnections() {
+ return this.creatorQueue.size();
+ }
+
+ /**
+ * Removes stale connections not accessed recently from the pool. This is
+ * invoked periodically.
+ */
+ private class CleanupTask implements Runnable {
+
+ @Override
+ public void run() {
+ long currentTime = Time.now();
+ List<ConnectionPoolId> toRemove = new LinkedList<>();
+
+ // Look for stale pools
+ readLock.lock();
+ try {
+ for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) {
+ ConnectionPool pool = entry.getValue();
+ long lastTimeActive = pool.getLastActiveTime();
+ boolean isStale =
+ currentTime > (lastTimeActive + poolCleanupPeriodMs);
+ if (lastTimeActive > 0 && isStale) {
+ // Remove this pool
+ LOG.debug("Closing and removing stale pool {}", pool);
+ pool.close();
+ ConnectionPoolId poolId = entry.getKey();
+ toRemove.add(poolId);
+ } else {
+ // Keep this pool but clean connections inside
+ LOG.debug("Cleaning up {}", pool);
+ cleanup(pool);
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ // Remove stale pools
+ if (!toRemove.isEmpty()) {
+ writeLock.lock();
+ try {
+ for (ConnectionPoolId poolId : toRemove) {
+ pools.remove(poolId);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Clean the unused connections for this pool.
+ *
+ * @param pool Connection pool to cleanup.
+ */
+ private void cleanup(ConnectionPool pool) {
+ if (pool.getNumConnections() > pool.getMinSize()) {
+ // Check if the pool hasn't been active in a while or not 50% are used
+ long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
+ int total = pool.getNumConnections();
+ int active = getNumActiveConnections();
+ if (timeSinceLastActive > connectionCleanupPeriodMs ||
+ active < MIN_ACTIVE_RATIO * total) {
+ // Remove and close 1 connection
+ List<ConnectionContext> conns = pool.removeConnections(1);
+ for (ConnectionContext conn : conns) {
+ conn.close();
+ }
+ LOG.debug("Removed connection {} used {} seconds ago. " +
+ "Pool has {}/{} connections", pool.getConnectionPoolId(),
+ TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive),
+ pool.getNumConnections(), pool.getMaxSize());
+ }
+ }
+ }
+ }
+
+ /**
+ * Thread that creates connections asynchronously.
+ */
+ private static class ConnectionCreator extends Thread {
+ /** If the creator is running. */
+ private boolean running = true;
+ /** Queue to push work to. */
+ private BlockingQueue<ConnectionPool> queue;
+
+ ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) {
+ super("Connection creator");
+ this.queue = blockingQueue;
+ }
+
+ @Override
+ public void run() {
+ while (this.running) {
+ try {
+ ConnectionPool pool = this.queue.take();
+ try {
+ int total = pool.getNumConnections();
+ int active = pool.getNumActiveConnections();
+ if (pool.getNumConnections() < pool.getMaxSize() &&
+ active >= MIN_ACTIVE_RATIO * total) {
+ ConnectionContext conn = pool.newConnection();
+ pool.addConnection(conn);
+ } else {
+ LOG.debug("Cannot add more than {} connections to {}",
+ pool.getMaxSize(), pool);
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot create a new connection", e);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("The connection creator was interrupted");
+ this.running = false;
+ }
+ }
+ }
+
+ /**
+ * Stop this connection creator.
+ */
+ public void shutdown() {
+ this.running = false;
+ this.interrupt();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
new file mode 100644
index 0000000..f76f621
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains a pool of connections for each User (including tokens) + NN. The
+ * RPC client maintains a single socket, to achieve throughput similar to a NN,
+ * each request is multiplexed across multiple sockets/connections from a
+ * pool.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ConnectionPool {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ConnectionPool.class);
+
+
+ /** Configuration settings for the connection pool. */
+ private final Configuration conf;
+
+ /** Identifier for this connection pool. */
+ private final ConnectionPoolId connectionPoolId;
+ /** Namenode this pool connects to. */
+ private final String namenodeAddress;
+ /** User for this connections. */
+ private final UserGroupInformation ugi;
+
+ /** Pool of connections. We mimic a COW array. */
+ private volatile List<ConnectionContext> connections = new ArrayList<>();
+ /** Connection index for round-robin. */
+ private final AtomicInteger clientIndex = new AtomicInteger(0);
+
+ /** Min number of connections per user. */
+ private final int minSize;
+ /** Max number of connections per user. */
+ private final int maxSize;
+
+ /** The last time a connection was active. */
+ private volatile long lastActiveTime = 0;
+
+
+ protected ConnectionPool(Configuration config, String address,
+ UserGroupInformation user, int minPoolSize, int maxPoolSize)
+ throws IOException {
+
+ this.conf = config;
+
+ // Connection pool target
+ this.ugi = user;
+ this.namenodeAddress = address;
+ this.connectionPoolId =
+ new ConnectionPoolId(this.ugi, this.namenodeAddress);
+
+ // Set configuration parameters for the pool
+ this.minSize = minPoolSize;
+ this.maxSize = maxPoolSize;
+
+ // Add minimum connections to the pool
+ for (int i=0; i<this.minSize; i++) {
+ ConnectionContext newConnection = newConnection();
+ this.connections.add(newConnection);
+ }
+ LOG.debug("Created connection pool \"{}\" with {} connections",
+ this.connectionPoolId, this.minSize);
+ }
+
+ /**
+ * Get the maximum number of connections allowed in this pool.
+ *
+ * @return Maximum number of connections.
+ */
+ protected int getMaxSize() {
+ return this.maxSize;
+ }
+
+ /**
+ * Get the minimum number of connections in this pool.
+ *
+ * @return Minimum number of connections.
+ */
+ protected int getMinSize() {
+ return this.minSize;
+ }
+
+ /**
+ * Get the connection pool identifier.
+ *
+ * @return Connection pool identifier.
+ */
+ protected ConnectionPoolId getConnectionPoolId() {
+ return this.connectionPoolId;
+ }
+
+ /**
+ * Return the next connection round-robin.
+ *
+ * @return Connection context.
+ */
+ protected ConnectionContext getConnection() {
+
+ this.lastActiveTime = Time.now();
+
+ // Get a connection from the pool following round-robin
+ ConnectionContext conn = null;
+ List<ConnectionContext> tmpConnections = this.connections;
+ int size = tmpConnections.size();
+ int threadIndex = this.clientIndex.getAndIncrement();
+ for (int i=0; i<size; i++) {
+ int index = (threadIndex + i) % size;
+ conn = tmpConnections.get(index);
+ if (conn != null && !conn.isUsable()) {
+ return conn;
+ }
+ }
+
+ // We return a connection even if it's active
+ return conn;
+ }
+
+ /**
+ * Add a connection to the current pool. It uses a Copy-On-Write approach.
+ *
+ * @param conns New connections to add to the pool.
+ */
+ public synchronized void addConnection(ConnectionContext conn) {
+ List<ConnectionContext> tmpConnections = new ArrayList<>(this.connections);
+ tmpConnections.add(conn);
+ this.connections = tmpConnections;
+
+ this.lastActiveTime = Time.now();
+ }
+
+ /**
+ * Remove connections from the current pool.
+ *
+ * @param num Number of connections to remove.
+ * @return Removed connections.
+ */
+ public synchronized List<ConnectionContext> removeConnections(int num) {
+ List<ConnectionContext> removed = new LinkedList<>();
+
+ // Remove and close the last connection
+ List<ConnectionContext> tmpConnections = new ArrayList<>();
+ for (int i=0; i<this.connections.size(); i++) {
+ ConnectionContext conn = this.connections.get(i);
+ if (i < this.minSize || i < this.connections.size() - num) {
+ tmpConnections.add(conn);
+ } else {
+ removed.add(conn);
+ }
+ }
+ this.connections = tmpConnections;
+
+ return removed;
+ }
+
+ /**
+ * Close the connection pool.
+ */
+ protected synchronized void close() {
+ long timeSinceLastActive = TimeUnit.MILLISECONDS.toSeconds(
+ Time.now() - getLastActiveTime());
+ LOG.debug("Shutting down connection pool \"{}\" used {} seconds ago",
+ this.connectionPoolId, timeSinceLastActive);
+
+ for (ConnectionContext connection : this.connections) {
+ connection.close();
+ }
+ this.connections.clear();
+ }
+
+ /**
+ * Number of connections in the pool.
+ *
+ * @return Number of connections.
+ */
+ protected int getNumConnections() {
+ return this.connections.size();
+ }
+
+ /**
+ * Number of active connections in the pool.
+ *
+ * @return Number of active connections.
+ */
+ protected int getNumActiveConnections() {
+ int ret = 0;
+
+ List<ConnectionContext> tmpConnections = this.connections;
+ for (ConnectionContext conn : tmpConnections) {
+ if (conn.isActive()) {
+ ret++;
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Get the last time the connection pool was used.
+ *
+ * @return Last time the connection pool was used.
+ */
+ protected long getLastActiveTime() {
+ return this.lastActiveTime;
+ }
+
+ @Override
+ public String toString() {
+ return this.connectionPoolId.toString();
+ }
+
+ /**
+ * Create a new proxy wrapper for a client NN connection.
+ * @return Proxy for the target ClientProtocol that contains the user's
+ * security context.
+ * @throws IOException
+ */
+ public ConnectionContext newConnection() throws IOException {
+ return newConnection(this.conf, this.namenodeAddress, this.ugi);
+ }
+
+ /**
+ * Creates a proxy wrapper for a client NN connection. Each proxy contains
+ * context for a single user/security context. To maximize throughput it is
+ * recommended to use multiple connection per user+server, allowing multiple
+ * writes and reads to be dispatched in parallel.
+ *
+ * @param conf Configuration for the connection.
+ * @param nnAddress Address of server supporting the ClientProtocol.
+ * @param ugi User context.
+ * @return Proxy for the target ClientProtocol that contains the user's
+ * security context.
+ * @throws IOException If it cannot be created.
+ */
+ protected static ConnectionContext newConnection(Configuration conf,
+ String nnAddress, UserGroupInformation ugi)
+ throws IOException {
+ RPC.setProtocolEngine(
+ conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
+
+ final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
+ conf,
+ HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
+ HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
+ HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
+ HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
+ HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
+
+ SocketFactory factory = SocketFactory.getDefault();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ SaslRpcServer.init(conf);
+ }
+ InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
+ final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
+ ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
+ ClientNamenodeProtocolPB.class, version, socket, ugi, conf,
+ factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
+ ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy);
+ Text dtService = SecurityUtil.buildTokenService(socket);
+
+ ProxyAndInfo<ClientProtocol> clientProxy =
+ new ProxyAndInfo<ClientProtocol>(client, dtService, socket);
+ ConnectionContext connection = new ConnectionContext(clientProxy);
+ return connection;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
new file mode 100644
index 0000000..a3a78de
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * Identifier for a connection for a user to a namenode.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ConnectionPoolId implements Comparable<ConnectionPoolId> {
+
+ /** Namenode identifier. */
+ private final String nnId;
+ /** Information about the user. */
+ private final UserGroupInformation ugi;
+
+ /**
+ * New connection pool identifier.
+ *
+ * @param ugi Information of the user issuing the request.
+ * @param nnId Namenode address with port.
+ */
+ public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) {
+ this.nnId = nnId;
+ this.ugi = ugi;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = new HashCodeBuilder(17, 31)
+ .append(this.nnId)
+ .append(this.ugi.toString())
+ .append(this.getTokenIds())
+ .toHashCode();
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ConnectionPoolId) {
+ ConnectionPoolId other = (ConnectionPoolId) o;
+ if (!this.nnId.equals(other.nnId)) {
+ return false;
+ }
+ if (!this.ugi.toString().equals(other.ugi.toString())) {
+ return false;
+ }
+ String thisTokens = this.getTokenIds().toString();
+ String otherTokens = other.getTokenIds().toString();
+ return thisTokens.equals(otherTokens);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return this.ugi + " " + this.getTokenIds() + "->" + this.nnId;
+ }
+
+ @Override
+ public int compareTo(ConnectionPoolId other) {
+ int ret = this.nnId.compareTo(other.nnId);
+ if (ret == 0) {
+ ret = this.ugi.toString().compareTo(other.ugi.toString());
+ }
+ if (ret == 0) {
+ String thisTokens = this.getTokenIds().toString();
+ String otherTokens = other.getTokenIds().toString();
+ ret = thisTokens.compareTo(otherTokens);
+ }
+ return ret;
+ }
+
+ /**
+ * Get the token identifiers for this connection.
+ * @return List with the token identifiers.
+ */
+ private List<String> getTokenIds() {
+ List<String> tokenIds = new ArrayList<>();
+ Collection<Token<? extends TokenIdentifier>> tokens = this.ugi.getTokens();
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ byte[] tokenIdBytes = token.getIdentifier();
+ String tokenId = Arrays.toString(tokenIdBytes);
+ tokenIds.add(tokenId);
+ }
+ Collections.sort(tokenIds);
+ return tokenIds;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
index da6066b..a90c460 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
@@ -17,22 +17,52 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
/**
- * Interface for objects that are unique to a namespace.
+ * Base class for objects that are unique to a namespace.
*/
-public interface RemoteLocationContext {
+public abstract class RemoteLocationContext
+ implements Comparable<RemoteLocationContext> {
/**
* Returns an identifier for a unique namespace.
*
* @return Namespace identifier.
*/
- String getNameserviceId();
+ public abstract String getNameserviceId();
/**
* Destination in this location. For example the path in a remote namespace.
*
* @return Destination in this location.
*/
- String getDest();
+ public abstract String getDest();
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 31)
+ .append(getNameserviceId())
+ .append(getDest())
+ .toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof RemoteLocationContext) {
+ RemoteLocationContext other = (RemoteLocationContext) obj;
+ return this.getNameserviceId().equals(other.getNameserviceId()) &&
+ this.getDest().equals(other.getDest());
+ }
+ return false;
+ }
+
+ @Override
+ public int compareTo(RemoteLocationContext info) {
+ int ret = this.getNameserviceId().compareTo(info.getNameserviceId());
+ if (ret == 0) {
+ ret = this.getDest().compareTo(info.getDest());
+ }
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
new file mode 100644
index 0000000..cd57d45
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Determines the remote client protocol method and the parameter list for a
+ * specific location.
+ */
+public class RemoteMethod {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteMethod.class);
+
+
+ /** List of parameters: static and dynamic values, matchings types. */
+ private final Object[] params;
+ /** List of method parameters types, matches parameters. */
+ private final Class<?>[] types;
+ /** String name of the ClientProtocol method. */
+ private final String methodName;
+
+ /**
+ * Create a method with no parameters.
+ *
+ * @param method The string name of the ClientProtocol method.
+ */
+ public RemoteMethod(String method) {
+ this.params = null;
+ this.types = null;
+ this.methodName = method;
+ }
+
+ /**
+ * Creates a remote method generator.
+ *
+ * @param method The string name of the ClientProtocol method.
+ * @param pTypes A list of types to use to locate the specific method.
+ * @param pParams A list of parameters for the method. The order of the
+ * parameter list must match the order and number of the types.
+ * Parameters are grouped into 2 categories:
+ * <ul>
+ * <li>Static parameters that are immutable across locations.
+ * <li>Dynamic parameters that are determined for each location by a
+ * RemoteParam object. To specify a dynamic parameter, pass an
+ * instance of RemoteParam in place of the parameter value.
+ * </ul>
+ * @throws IOException If the types and parameter lists are not valid.
+ */
+ public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams)
+ throws IOException {
+
+ if (pParams.length != pTypes.length) {
+ throw new IOException("Invalid parameters for method " + method);
+ }
+
+ this.params = pParams;
+ this.types = pTypes;
+ this.methodName = method;
+ }
+
+ /**
+ * Get the represented java method.
+ *
+ * @return Method
+ * @throws IOException If the method cannot be found.
+ */
+ public Method getMethod() throws IOException {
+ try {
+ if (types != null) {
+ return ClientProtocol.class.getDeclaredMethod(methodName, types);
+ } else {
+ return ClientProtocol.class.getDeclaredMethod(methodName);
+ }
+ } catch (NoSuchMethodException e) {
+ // Re-throw as an IOException
+ LOG.error("Cannot get method {} with types {}",
+ methodName, Arrays.toString(types), e);
+ throw new IOException(e);
+ } catch (SecurityException e) {
+ LOG.error("Cannot access method {} with types {}",
+ methodName, Arrays.toString(types), e);
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Get the calling types for this method.
+ *
+ * @return An array of calling types.
+ */
+ public Class<?>[] getTypes() {
+ return this.types;
+ }
+
+ /**
+ * Generate a list of parameters for this specific location using no context.
+ *
+ * @return A list of parameters for the method customized for the location.
+ */
+ public Object[] getParams() {
+ return this.getParams(null);
+ }
+
+ /**
+ * Get the name of the method.
+ *
+ * @return Name of the method.
+ */
+ public String getMethodName() {
+ return this.methodName;
+ }
+
+ /**
+ * Generate a list of parameters for this specific location. Parameters are
+ * grouped into 2 categories:
+ * <ul>
+ * <li>Static parameters that are immutable across locations.
+ * <li>Dynamic parameters that are determined for each location by a
+ * RemoteParam object.
+ * </ul>
+ *
+ * @param context The context identifying the location.
+ * @return A list of parameters for the method customized for the location.
+ */
+ public Object[] getParams(RemoteLocationContext context) {
+ if (this.params == null) {
+ return new Object[] {};
+ }
+ Object[] objList = new Object[this.params.length];
+ for (int i = 0; i < this.params.length; i++) {
+ Object currentObj = this.params[i];
+ if (currentObj instanceof RemoteParam) {
+ // Map the parameter using the context
+ RemoteParam paramGetter = (RemoteParam) currentObj;
+ objList[i] = paramGetter.getParameterForContext(context);
+ } else {
+ objList[i] = currentObj;
+ }
+ }
+ return objList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
new file mode 100644
index 0000000..8816ff6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.Map;
+
+/**
+ * A dynamically assignable parameter that is location-specific.
+ * <p>
+ * There are 2 ways this mapping is determined:
+ * <ul>
+ * <li>Default: Uses the RemoteLocationContext's destination
+ * <li>Map: Uses the value of the RemoteLocationContext key provided in the
+ * parameter map.
+ * </ul>
+ */
+public class RemoteParam {
+
+ private final Map<? extends Object, ? extends Object> paramMap;
+
+ /**
+ * Constructs a default remote parameter. Always maps the value to the
+ * destination of the provided RemoveLocationContext.
+ */
+ public RemoteParam() {
+ this.paramMap = null;
+ }
+
+ /**
+ * Constructs a map based remote parameter. Determines the value using the
+ * provided RemoteLocationContext as a key into the map.
+ *
+ * @param map Map with RemoteLocationContext keys.
+ */
+ public RemoteParam(
+ Map<? extends RemoteLocationContext, ? extends Object> map) {
+ this.paramMap = map;
+ }
+
+ /**
+ * Determine the appropriate value for this parameter based on the location.
+ *
+ * @param context Context identifying the location.
+ * @return A parameter specific to this location.
+ */
+ public Object getParameterForContext(RemoteLocationContext context) {
+ if (context == null) {
+ return null;
+ } else if (this.paramMap != null) {
+ return this.paramMap.get(context);
+ } else {
+ // Default case
+ return context.getDest();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48850c41/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index fe0d02a..019a5cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -22,12 +22,14 @@ import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.new
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@@ -36,6 +38,8 @@ import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Router that provides a unified view of multiple federated HDFS clusters. It
@@ -60,7 +64,7 @@ import org.apache.hadoop.util.StringUtils;
@InterfaceStability.Evolving
public class Router extends CompositeService {
- private static final Log LOG = LogFactory.getLog(Router.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Router.class);
/** Configuration for the Router. */
@@ -71,6 +75,7 @@ public class Router extends CompositeService {
/** RPC interface to the client. */
private RouterRpcServer rpcServer;
+ private InetSocketAddress rpcAddress;
/** Interface with the State Store. */
private StateStoreService stateStore;
@@ -105,9 +110,6 @@ public class Router extends CompositeService {
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration;
- // TODO Interface to the State Store
- this.stateStore = null;
-
// Resolver to track active NNs
this.namenodeResolver = newActiveNamenodeResolver(
this.conf, this.stateStore);
@@ -122,6 +124,15 @@ public class Router extends CompositeService {
throw new IOException("Cannot find subcluster resolver");
}
+ if (conf.getBoolean(
+ DFSConfigKeys.DFS_ROUTER_RPC_ENABLE,
+ DFSConfigKeys.DFS_ROUTER_RPC_ENABLE_DEFAULT)) {
+ // Create RPC server
+ this.rpcServer = createRpcServer();
+ addService(this.rpcServer);
+ this.setRpcServerAddress(rpcServer.getRpcAddress());
+ }
+
super.serviceInit(conf);
}
@@ -171,11 +182,13 @@ public class Router extends CompositeService {
router.init(conf);
router.start();
} catch (Throwable e) {
- LOG.error("Failed to start router.", e);
+ LOG.error("Failed to start router", e);
terminate(1, e);
}
}
+
+
/////////////////////////////////////////////////////////
// RPC Server
/////////////////////////////////////////////////////////
@@ -183,7 +196,7 @@ public class Router extends CompositeService {
/**
* Create a new Router RPC server to proxy ClientProtocol requests.
*
- * @return RouterRpcServer
+ * @return New Router RPC Server.
* @throws IOException If the router RPC server was not started.
*/
protected RouterRpcServer createRpcServer() throws IOException {
@@ -200,6 +213,35 @@ public class Router extends CompositeService {
return this.rpcServer;
}
+ /**
+ * Set the current RPC socket for the router.
+ *
+ * @param rpcAddress RPC address.
+ */
+ protected void setRpcServerAddress(InetSocketAddress address) {
+ this.rpcAddress = address;
+
+ // Use the RPC address as our unique router Id
+ if (this.rpcAddress != null) {
+ try {
+ String hostname = InetAddress.getLocalHost().getHostName();
+ setRouterId(hostname + ":" + this.rpcAddress.getPort());
+ } catch (UnknownHostException ex) {
+ LOG.error("Cannot set unique router ID, address not resolvable {}",
+ this.rpcAddress);
+ }
+ }
+ }
+
+ /**
+ * Get the current RPC socket address for the router.
+ *
+ * @return InetSocketAddress
+ */
+ public InetSocketAddress getRpcServerAddress() {
+ return this.rpcAddress;
+ }
+
/////////////////////////////////////////////////////////
// Submodule getters
/////////////////////////////////////////////////////////
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org