You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/12 20:03:23 UTC
[38/50] [abbrv] hadoop git commit: HDFS-10646. Federation admin tool.
Contributed by Inigo Goiri.
HDFS-10646. Federation admin tool. Contributed by Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae5673b7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae5673b7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae5673b7
Branch: refs/heads/HDFS-10467
Commit: ae5673b75e34046d149243fc99e14068b4deea4b
Parents: 4cab5db
Author: Inigo Goiri <in...@apache.org>
Authored: Tue Aug 8 14:44:43 2017 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Tue Sep 12 13:02:20 2017 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/pom.xml | 1 +
.../hadoop-hdfs/src/main/bin/hdfs | 5 +
.../hadoop-hdfs/src/main/bin/hdfs.cmd | 7 +-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 19 ++
.../hdfs/protocolPB/RouterAdminProtocolPB.java | 44 +++
...uterAdminProtocolServerSideTranslatorPB.java | 151 ++++++++
.../RouterAdminProtocolTranslatorPB.java | 150 ++++++++
.../resolver/MembershipNamenodeResolver.java | 34 +-
.../hdfs/server/federation/router/Router.java | 52 +++
.../federation/router/RouterAdminServer.java | 183 ++++++++++
.../server/federation/router/RouterClient.java | 76 +++++
.../hdfs/tools/federation/RouterAdmin.java | 341 +++++++++++++++++++
.../hdfs/tools/federation/package-info.java | 28 ++
.../src/main/proto/RouterProtocol.proto | 47 +++
.../src/main/resources/hdfs-default.xml | 46 +++
.../server/federation/RouterConfigBuilder.java | 26 ++
.../server/federation/RouterDFSCluster.java | 43 ++-
.../server/federation/StateStoreDFSCluster.java | 148 ++++++++
.../federation/router/TestRouterAdmin.java | 261 ++++++++++++++
19 files changed, 1644 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index cc7a975..93216db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -332,6 +332,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<include>editlog.proto</include>
<include>fsimage.proto</include>
<include>FederationProtocol.proto</include>
+ <include>RouterProtocol.proto</include>
</includes>
</source>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
index b1f44a4..d51a8e2 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
@@ -31,6 +31,7 @@ function hadoop_usage
hadoop_add_option "--hosts filename" "list of hosts to use in worker mode"
hadoop_add_option "--workers" "turn on worker mode"
+<<<<<<< HEAD
hadoop_add_subcommand "balancer" daemon "run a cluster balancing utility"
hadoop_add_subcommand "cacheadmin" admin "configure the HDFS cache"
hadoop_add_subcommand "classpath" client "prints the class path needed to get the hadoop jar and the required libraries"
@@ -42,6 +43,7 @@ function hadoop_usage
hadoop_add_subcommand "diskbalancer" daemon "Distributes data evenly among disks on a given node"
hadoop_add_subcommand "envvars" client "display computed Hadoop environment variables"
hadoop_add_subcommand "ec" admin "run a HDFS ErasureCoding CLI"
+ hadoop_add_subcommand "federation" admin "manage Router-based federation"
hadoop_add_subcommand "fetchdt" client "fetch a delegation token from the NameNode"
hadoop_add_subcommand "fsck" admin "run a DFS filesystem checking utility"
hadoop_add_subcommand "getconf" client "get config values from configuration"
@@ -181,6 +183,9 @@ function hdfscmd_case
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.federation.router.Router'
;;
+ federation)
+ HADOOP_CLASSNAME='org.apache.hadoop.hdfs.tools.federation.RouterAdmin'
+ ;;
secondarynamenode)
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd
index b9853d6..53bdf70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd
@@ -59,7 +59,7 @@ if "%1" == "--loglevel" (
)
)
- set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto router debug
+ set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto router federation debug
for %%i in ( %hdfscommands% ) do (
if %hdfs-command% == %%i set hdfscommand=true
)
@@ -184,6 +184,11 @@ goto :eof
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ROUTER_OPTS%
goto :eof
+:federation
+ set CLASS=org.apache.hadoop.hdfs.tools.federation.RouterAdmin
+ set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ROUTER_OPTS%
+ goto :eof
+
:debug
set CLASS=org.apache.hadoop.hdfs.tools.DebugAdmin
goto :eof
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 702729b..5fd0811 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1193,6 +1193,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String FEDERATION_STORE_PREFIX =
FEDERATION_ROUTER_PREFIX + "store.";
+ public static final String DFS_ROUTER_STORE_ENABLE =
+ FEDERATION_STORE_PREFIX + "enable";
+ public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true;
+
public static final String FEDERATION_STORE_SERIALIZER_CLASS =
DFSConfigKeys.FEDERATION_STORE_PREFIX + "serializer";
public static final Class<StateStoreSerializerPBImpl>
@@ -1219,6 +1223,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
+ // HDFS Router-based federation admin
+ public static final String DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY =
+ FEDERATION_ROUTER_PREFIX + "admin.handler.count";
+ public static final int DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT = 1;
+ public static final int DFS_ROUTER_ADMIN_PORT_DEFAULT = 8111;
+ public static final String DFS_ROUTER_ADMIN_ADDRESS_KEY =
+ FEDERATION_ROUTER_PREFIX + "admin-address";
+ public static final String DFS_ROUTER_ADMIN_ADDRESS_DEFAULT =
+ "0.0.0.0:" + DFS_ROUTER_ADMIN_PORT_DEFAULT;
+ public static final String DFS_ROUTER_ADMIN_BIND_HOST_KEY =
+ FEDERATION_ROUTER_PREFIX + "admin-bind-host";
+ public static final String DFS_ROUTER_ADMIN_ENABLE =
+ FEDERATION_ROUTER_PREFIX + "admin.enable";
+ public static final boolean DFS_ROUTER_ADMIN_ENABLE_DEFAULT = true;
+
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolPB.java
new file mode 100644
index 0000000..96fa794
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolPB.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+
+/**
+ * Protocol that a clients use to communicate with the NameNode.
+ * Note: This extends the protocolbuffer service based interface to
+ * add annotations required for security.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY)
+@TokenInfo(DelegationTokenSelector.class)
+@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME,
+ protocolVersion = 1)
+public interface RouterAdminProtocolPB extends
+ RouterAdminProtocolService.BlockingInterface {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..415bbd9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto;
+import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryRequestPBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryResponsePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesRequestPBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is used on the server side. Calls come across the wire for the for
+ * protocol {@link RouterAdminProtocolPB}. This class translates the PB data
+ * types to the native data types used inside the HDFS Router as specified in
+ * the generic RouterAdminProtocol.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class RouterAdminProtocolServerSideTranslatorPB implements
+ RouterAdminProtocolPB {
+
+ private final RouterAdminServer server;
+
+ /**
+ * Constructor.
+ * @param server The NN server.
+ * @throws IOException
+ */
+ public RouterAdminProtocolServerSideTranslatorPB(RouterAdminServer server)
+ throws IOException {
+ this.server = server;
+ }
+
+ @Override
+ public AddMountTableEntryResponseProto addMountTableEntry(
+ RpcController controller, AddMountTableEntryRequestProto request)
+ throws ServiceException {
+
+ try {
+ AddMountTableEntryRequest req =
+ new AddMountTableEntryRequestPBImpl(request);
+ AddMountTableEntryResponse response = server.addMountTableEntry(req);
+ AddMountTableEntryResponsePBImpl responsePB =
+ (AddMountTableEntryResponsePBImpl)response;
+ return responsePB.getProto();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ /**
+ * Remove an entry from the mount table.
+ */
+ @Override
+ public RemoveMountTableEntryResponseProto removeMountTableEntry(
+ RpcController controller, RemoveMountTableEntryRequestProto request)
+ throws ServiceException {
+ try {
+ RemoveMountTableEntryRequest req =
+ new RemoveMountTableEntryRequestPBImpl(request);
+ RemoveMountTableEntryResponse response =
+ server.removeMountTableEntry(req);
+ RemoveMountTableEntryResponsePBImpl responsePB =
+ (RemoveMountTableEntryResponsePBImpl)response;
+ return responsePB.getProto();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ /**
+ * Get matching mount table entries.
+ */
+ @Override
+ public GetMountTableEntriesResponseProto getMountTableEntries(
+ RpcController controller, GetMountTableEntriesRequestProto request)
+ throws ServiceException {
+ try {
+ GetMountTableEntriesRequest req =
+ new GetMountTableEntriesRequestPBImpl(request);
+ GetMountTableEntriesResponse response = server.getMountTableEntries(req);
+ GetMountTableEntriesResponsePBImpl responsePB =
+ (GetMountTableEntriesResponsePBImpl)response;
+ return responsePB.getProto();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ /**
+ * Update a single mount table entry.
+ */
+ @Override
+ public UpdateMountTableEntryResponseProto updateMountTableEntry(
+ RpcController controller, UpdateMountTableEntryRequestProto request)
+ throws ServiceException {
+ try {
+ UpdateMountTableEntryRequest req =
+ new UpdateMountTableEntryRequestPBImpl(request);
+ UpdateMountTableEntryResponse response =
+ server.updateMountTableEntry(req);
+ UpdateMountTableEntryResponsePBImpl responsePB =
+ (UpdateMountTableEntryResponsePBImpl)response;
+ return responsePB.getProto();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java
new file mode 100644
index 0000000..43663ac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryRequestPBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryResponsePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesRequestPBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
+ * while translating from the parameter types used in ClientProtocol to the
+ * new PB types.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class RouterAdminProtocolTranslatorPB
+ implements ProtocolMetaInterface, MountTableManager,
+ Closeable, ProtocolTranslator {
+ final private RouterAdminProtocolPB rpcProxy;
+
+ public RouterAdminProtocolTranslatorPB(RouterAdminProtocolPB proxy) {
+ rpcProxy = proxy;
+ }
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+ @Override
+ public Object getUnderlyingProxyObject() {
+ return rpcProxy;
+ }
+
+ @Override
+ public boolean isMethodSupported(String methodName) throws IOException {
+ return RpcClientUtil.isMethodSupported(rpcProxy,
+ RouterAdminProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(RouterAdminProtocolPB.class), methodName);
+ }
+
+ @Override
+ public AddMountTableEntryResponse addMountTableEntry(
+ AddMountTableEntryRequest request) throws IOException {
+ AddMountTableEntryRequestPBImpl requestPB =
+ (AddMountTableEntryRequestPBImpl)request;
+ AddMountTableEntryRequestProto proto = requestPB.getProto();
+ try {
+ AddMountTableEntryResponseProto response =
+ rpcProxy.addMountTableEntry(null, proto);
+ return new AddMountTableEntryResponsePBImpl(response);
+ } catch (ServiceException e) {
+ throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+ }
+ }
+
+ @Override
+ public UpdateMountTableEntryResponse updateMountTableEntry(
+ UpdateMountTableEntryRequest request) throws IOException {
+ UpdateMountTableEntryRequestPBImpl requestPB =
+ (UpdateMountTableEntryRequestPBImpl)request;
+ UpdateMountTableEntryRequestProto proto = requestPB.getProto();
+ try {
+ UpdateMountTableEntryResponseProto response =
+ rpcProxy.updateMountTableEntry(null, proto);
+ return new UpdateMountTableEntryResponsePBImpl(response);
+ } catch (ServiceException e) {
+ throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+ }
+ }
+
+ @Override
+ public RemoveMountTableEntryResponse removeMountTableEntry(
+ RemoveMountTableEntryRequest request) throws IOException {
+ RemoveMountTableEntryRequestPBImpl requestPB =
+ (RemoveMountTableEntryRequestPBImpl)request;
+ RemoveMountTableEntryRequestProto proto = requestPB.getProto();
+ try {
+ RemoveMountTableEntryResponseProto responseProto =
+ rpcProxy.removeMountTableEntry(null, proto);
+ return new RemoveMountTableEntryResponsePBImpl(responseProto);
+ } catch (ServiceException e) {
+ throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+ }
+ }
+
+ @Override
+ public GetMountTableEntriesResponse getMountTableEntries(
+ GetMountTableEntriesRequest request) throws IOException {
+ GetMountTableEntriesRequestPBImpl requestPB =
+ (GetMountTableEntriesRequestPBImpl)request;
+ GetMountTableEntriesRequestProto proto = requestPB.getProto();
+ try {
+ GetMountTableEntriesResponseProto response =
+ rpcProxy.getMountTableEntries(null, proto);
+ return new GetMountTableEntriesResponsePBImpl(response);
+ } catch (ServiceException e) {
+ throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
index b0ced24..d974c78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
@@ -61,7 +61,7 @@ public class MembershipNamenodeResolver
/** Reference to the State Store. */
private final StateStoreService stateStore;
/** Membership State Store interface. */
- private final MembershipStore membershipInterface;
+ private MembershipStore membershipInterface;
/** Parent router ID. */
private String routerId;
@@ -82,25 +82,27 @@ public class MembershipNamenodeResolver
if (this.stateStore != null) {
// Request cache updates from the state store
this.stateStore.registerCacheExternal(this);
-
- // Initialize the interface to get the membership
- this.membershipInterface = this.stateStore.getRegisteredRecordStore(
- MembershipStore.class);
- } else {
- this.membershipInterface = null;
}
+ }
+ private synchronized MembershipStore getMembershipStore() throws IOException {
if (this.membershipInterface == null) {
- throw new IOException("State Store does not have an interface for " +
- MembershipStore.class.getSimpleName());
+ this.membershipInterface = this.stateStore.getRegisteredRecordStore(
+ MembershipStore.class);
+ if (this.membershipInterface == null) {
+ throw new IOException("State Store does not have an interface for " +
+ MembershipStore.class.getSimpleName());
+ }
}
+ return this.membershipInterface;
}
@Override
public boolean loadCache(boolean force) {
// Our cache depends on the store, update it first
try {
- this.membershipInterface.loadCache(force);
+ MembershipStore membership = getMembershipStore();
+ membership.loadCache(force);
} catch (IOException e) {
LOG.error("Cannot update membership from the State Store", e);
}
@@ -126,8 +128,9 @@ public class MembershipNamenodeResolver
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance(partial);
+ MembershipStore membership = getMembershipStore();
GetNamenodeRegistrationsResponse response =
- this.membershipInterface.getNamenodeRegistrations(request);
+ membership.getNamenodeRegistrations(request);
List<MembershipState> records = response.getNamenodeMemberships();
if (records != null && records.size() == 1) {
@@ -135,7 +138,7 @@ public class MembershipNamenodeResolver
UpdateNamenodeRegistrationRequest updateRequest =
UpdateNamenodeRegistrationRequest.newInstance(
record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
- this.membershipInterface.updateNamenodeRegistration(updateRequest);
+ membership.updateNamenodeRegistration(updateRequest);
}
} catch (StateStoreUnavailableException e) {
LOG.error("Cannot update {} as active, State Store unavailable", address);
@@ -226,14 +229,14 @@ public class MembershipNamenodeResolver
NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance();
request.setNamenodeMembership(record);
- return this.membershipInterface.namenodeHeartbeat(request).getResult();
+ return getMembershipStore().namenodeHeartbeat(request).getResult();
}
@Override
public Set<FederationNamespaceInfo> getNamespaces() throws IOException {
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
GetNamespaceInfoResponse response =
- this.membershipInterface.getNamespaceInfo(request);
+ getMembershipStore().getNamespaceInfo(request);
return response.getNamespaceInfo();
}
@@ -259,8 +262,9 @@ public class MembershipNamenodeResolver
// Retrieve a list of all registrations that match this query.
// This may include all NN records for a namespace/blockpool, including
// duplicate records for the same NN from different routers.
+ MembershipStore membershipStore = getMembershipStore();
GetNamenodeRegistrationsResponse response =
- this.membershipInterface.getNamenodeRegistrations(request);
+ membershipStore.getNamenodeRegistrations(request);
List<MembershipState> memberships = response.getNamenodeMemberships();
if (!addExpired || !addUnavailable) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 213a58f..fcbd2eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -81,6 +81,10 @@ public class Router extends CompositeService {
private RouterRpcServer rpcServer;
private InetSocketAddress rpcAddress;
+ /** RPC interface for the admin. */
+ private RouterAdminServer adminServer;
+ private InetSocketAddress adminAddress;
+
/** Interface with the State Store. */
private StateStoreService stateStore;
@@ -116,6 +120,14 @@ public class Router extends CompositeService {
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration;
+ if (conf.getBoolean(
+ DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
+ DFSConfigKeys.DFS_ROUTER_STORE_ENABLE_DEFAULT)) {
+ // Service that maintains the State Store connection
+ this.stateStore = new StateStoreService();
+ addService(this.stateStore);
+ }
+
// Resolver to track active NNs
this.namenodeResolver = newActiveNamenodeResolver(
this.conf, this.stateStore);
@@ -139,6 +151,14 @@ public class Router extends CompositeService {
}
if (conf.getBoolean(
+ DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE,
+ DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE_DEFAULT)) {
+ // Create admin server
+ this.adminServer = createAdminServer();
+ addService(this.adminServer);
+ }
+
+ if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
@@ -264,6 +284,38 @@ public class Router extends CompositeService {
}
/////////////////////////////////////////////////////////
+ // Admin server
+ /////////////////////////////////////////////////////////
+
+ /**
+ * Create a new router admin server to handle the router admin interface.
+ *
+ * @return RouterAdminServer
+ * @throws IOException If the admin server was not successfully started.
+ */
+ protected RouterAdminServer createAdminServer() throws IOException {
+ return new RouterAdminServer(this.conf, this);
+ }
+
+ /**
+ * Set the current Admin socket for the router.
+ *
+ * @param adminAddress Admin RPC address.
+ */
+ protected void setAdminServerAddress(InetSocketAddress address) {
+ this.adminAddress = address;
+ }
+
+ /**
+ * Get the current Admin socket address for the router.
+ *
+ * @return InetSocketAddress Admin address.
+ */
+ public InetSocketAddress getAdminServerAddress() {
+ return adminAddress;
+ }
+
+ /////////////////////////////////////////////////////////
// Namenode heartbeat monitors
/////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
new file mode 100644
index 0000000..7687216
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
+import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
+
+/**
+ * This class is responsible for handling all of the Admin calls to the HDFS
+ * router. It is created, started, and stopped by {@link Router}.
+ */
+public class RouterAdminServer extends AbstractService
+ implements MountTableManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterAdminServer.class);
+
+ private Configuration conf;
+
+ private final Router router;
+
+ private MountTableStore mountTableStore;
+
+ /** The Admin server that listens to requests from clients. */
+ private final Server adminServer;
+ private final InetSocketAddress adminAddress;
+
+ public RouterAdminServer(Configuration conf, Router router)
+ throws IOException {
+ super(RouterAdminServer.class.getName());
+
+ this.conf = conf;
+ this.router = router;
+
+ int handlerCount = this.conf.getInt(
+ DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY,
+ DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT);
+
+ RPC.setProtocolEngine(this.conf, RouterAdminProtocolPB.class,
+ ProtobufRpcEngine.class);
+
+ RouterAdminProtocolServerSideTranslatorPB routerAdminProtocolTranslator =
+ new RouterAdminProtocolServerSideTranslatorPB(this);
+ BlockingService clientNNPbService = RouterAdminProtocolService.
+ newReflectiveBlockingService(routerAdminProtocolTranslator);
+
+ InetSocketAddress confRpcAddress = conf.getSocketAddr(
+ DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY,
+ DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+ DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT,
+ DFSConfigKeys.DFS_ROUTER_ADMIN_PORT_DEFAULT);
+
+ String bindHost = conf.get(
+ DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY,
+ confRpcAddress.getHostName());
+ LOG.info("Admin server binding to {}:{}",
+ bindHost, confRpcAddress.getPort());
+
+ this.adminServer = new RPC.Builder(this.conf)
+ .setProtocol(RouterAdminProtocolPB.class)
+ .setInstance(clientNNPbService)
+ .setBindAddress(bindHost)
+ .setPort(confRpcAddress.getPort())
+ .setNumHandlers(handlerCount)
+ .setVerbose(false)
+ .build();
+
+ // The RPC-server port can be ephemeral... ensure we have the correct info
+ InetSocketAddress listenAddress = this.adminServer.getListenerAddress();
+ this.adminAddress = new InetSocketAddress(
+ confRpcAddress.getHostName(), listenAddress.getPort());
+ router.setAdminServerAddress(this.adminAddress);
+ }
+
+ /** Allow access to the client RPC server for testing. */
+ @VisibleForTesting
+ Server getAdminServer() {
+ return this.adminServer;
+ }
+
+ private MountTableStore getMountTableStore() throws IOException {
+ if (this.mountTableStore == null) {
+ this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
+ MountTableStore.class);
+ if (this.mountTableStore == null) {
+ throw new IOException("Mount table state store is not available.");
+ }
+ }
+ return this.mountTableStore;
+ }
+
+ /**
+ * Get the RPC address of the admin service.
+ * @return Administration service RPC address.
+ */
+ public InetSocketAddress getRpcAddress() {
+ return this.adminAddress;
+ }
+
+ @Override
+ protected void serviceInit(Configuration configuration) throws Exception {
+ this.conf = configuration;
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ this.adminServer.start();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.adminServer != null) {
+ this.adminServer.stop();
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public AddMountTableEntryResponse addMountTableEntry(
+ AddMountTableEntryRequest request) throws IOException {
+ return getMountTableStore().addMountTableEntry(request);
+ }
+
+ @Override
+ public UpdateMountTableEntryResponse updateMountTableEntry(
+ UpdateMountTableEntryRequest request) throws IOException {
+ return getMountTableStore().updateMountTableEntry(request);
+ }
+
+ @Override
+ public RemoveMountTableEntryResponse removeMountTableEntry(
+ RemoveMountTableEntryRequest request) throws IOException {
+ return getMountTableStore().removeMountTableEntry(request);
+ }
+
+ @Override
+ public GetMountTableEntriesResponse getMountTableEntries(
+ GetMountTableEntriesRequest request) throws IOException {
+ return getMountTableStore().getMountTableEntries(request);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java
new file mode 100644
index 0000000..1f76b98
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Client to connect to the {@link Router} via the admin protocol.
+ */
+@Private
+public class RouterClient implements Closeable {
+
+ private final RouterAdminProtocolTranslatorPB proxy;
+ private final UserGroupInformation ugi;
+
+ private static RouterAdminProtocolTranslatorPB createRouterProxy(
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
+ throws IOException {
+
+ RPC.setProtocolEngine(
+ conf, RouterAdminProtocolPB.class, ProtobufRpcEngine.class);
+
+ AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
+ final long version = RPC.getProtocolVersion(RouterAdminProtocolPB.class);
+ RouterAdminProtocolPB proxy = RPC.getProtocolProxy(
+ RouterAdminProtocolPB.class, version, address, ugi, conf,
+ NetUtils.getDefaultSocketFactory(conf),
+ RPC.getRpcTimeout(conf), null,
+ fallbackToSimpleAuth).getProxy();
+
+ return new RouterAdminProtocolTranslatorPB(proxy);
+ }
+
+ public RouterClient(InetSocketAddress address, Configuration conf)
+ throws IOException {
+ this.ugi = UserGroupInformation.getCurrentUser();
+ this.proxy = createRouterProxy(address, conf, ugi);
+ }
+
+ public MountTableManager getMountTableManager() {
+ return proxy;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ RPC.stopProxy(proxy);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
new file mode 100644
index 0000000..0786419
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools.federation;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides some Federation administrative access shell commands.
+ */
+@Private
+public class RouterAdmin extends Configured implements Tool {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class);
+
+ private RouterClient client;
+
+ public static void main(String[] argv) throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ RouterAdmin admin = new RouterAdmin(conf);
+
+ int res = ToolRunner.run(admin, argv);
+ System.exit(res);
+ }
+
+ public RouterAdmin(Configuration conf) {
+ super(conf);
+ }
+
+ /**
+ * Print the usage message.
+ */
+ public void printUsage() {
+ String usage = "Federation Admin Tools:\n"
+ + "\t[-add <source> <nameservice> <destination> "
+ + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL]]\n"
+ + "\t[-rm <source>]\n"
+ + "\t[-ls <path>]\n";
+ System.out.println(usage);
+ }
+
+ @Override
+ public int run(String[] argv) throws Exception {
+ if (argv.length < 1) {
+ System.err.println("Not enough parameters specificed");
+ printUsage();
+ return -1;
+ }
+
+ int exitCode = -1;
+ int i = 0;
+ String cmd = argv[i++];
+
+ // Verify that we have enough command line parameters
+ if ("-add".equals(cmd)) {
+ if (argv.length < 4) {
+ System.err.println("Not enough parameters specificed for cmd " + cmd);
+ printUsage();
+ return exitCode;
+ }
+ } else if ("-rm".equalsIgnoreCase(cmd)) {
+ if (argv.length < 2) {
+ System.err.println("Not enough parameters specificed for cmd " + cmd);
+ printUsage();
+ return exitCode;
+ }
+ }
+
+ // Initialize RouterClient
+ try {
+ String address = getConf().getTrimmed(
+ DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+ DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
+ InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
+ client = new RouterClient(routerSocket, getConf());
+ } catch (RPC.VersionMismatch v) {
+ System.err.println(
+ "Version mismatch between client and server... command aborted");
+ return exitCode;
+ } catch (IOException e) {
+ System.err.println("Bad connection to Router... command aborted");
+ return exitCode;
+ }
+
+ Exception debugException = null;
+ exitCode = 0;
+ try {
+ if ("-add".equals(cmd)) {
+ if (addMount(argv, i)) {
+ System.err.println("Successfuly added mount point " + argv[i]);
+ }
+ } else if ("-rm".equals(cmd)) {
+ if (removeMount(argv[i])) {
+ System.err.println("Successfully removed mount point " + argv[i]);
+ }
+ } else if ("-ls".equals(cmd)) {
+ if (argv.length > 1) {
+ listMounts(argv[i]);
+ } else {
+ listMounts("/");
+ }
+ } else {
+ printUsage();
+ return exitCode;
+ }
+ } catch (IllegalArgumentException arge) {
+ debugException = arge;
+ exitCode = -1;
+ System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
+ printUsage();
+ } catch (RemoteException e) {
+ // This is a error returned by the server.
+ // Print out the first line of the error message, ignore the stack trace.
+ exitCode = -1;
+ debugException = e;
+ try {
+ String[] content;
+ content = e.getLocalizedMessage().split("\n");
+ System.err.println(cmd.substring(1) + ": " + content[0]);
+ e.printStackTrace();
+ } catch (Exception ex) {
+ System.err.println(cmd.substring(1) + ": " + ex.getLocalizedMessage());
+ e.printStackTrace();
+ debugException = ex;
+ }
+ } catch (Exception e) {
+ exitCode = -1;
+ debugException = e;
+ System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage());
+ e.printStackTrace();
+ }
+ if (debugException != null) {
+ LOG.debug("Exception encountered", debugException);
+ }
+ return exitCode;
+ }
+
+ /**
+ * Add a mount table entry or update if it exists.
+ *
+ * @param parameters Parameters for the mount point.
+ * @param i Index in the parameters.
+ */
+ public boolean addMount(String[] parameters, int i) throws IOException {
+ // Mandatory parameters
+ String mount = parameters[i++];
+ String[] nss = parameters[i++].split(",");
+ String dest = parameters[i++];
+
+ // Optional parameters
+ boolean readOnly = false;
+ DestinationOrder order = DestinationOrder.HASH;
+ while (i < parameters.length) {
+ if (parameters[i].equals("-readonly")) {
+ readOnly = true;
+ } else if (parameters[i].equals("-order")) {
+ i++;
+ try {
+ order = DestinationOrder.valueOf(parameters[i]);
+ } catch(Exception e) {
+ System.err.println("Cannot parse order: " + parameters[i]);
+ }
+ }
+ i++;
+ }
+
+ return addMount(mount, nss, dest, readOnly, order);
+ }
+
+ /**
+ * Add a mount table entry or update if it exists.
+ *
+ * @param mount Mount point.
+ * @param nss Namespaces where this is mounted to.
+ * @param dest Destination path.
+ * @param readonly If the mount point is read only.
+ * @param order Order of the destination locations.
+ * @return If the mount point was added.
+ * @throws IOException Error adding the mount point.
+ */
+ public boolean addMount(String mount, String[] nss, String dest,
+ boolean readonly, DestinationOrder order) throws IOException {
+ // Get the existing entry
+ MountTableManager mountTable = client.getMountTableManager();
+ GetMountTableEntriesRequest getRequest =
+ GetMountTableEntriesRequest.newInstance(mount);
+ GetMountTableEntriesResponse getResponse =
+ mountTable.getMountTableEntries(getRequest);
+ List<MountTable> results = getResponse.getEntries();
+ MountTable existingEntry = null;
+ for (MountTable result : results) {
+ if (mount.equals(result.getSourcePath())) {
+ existingEntry = result;
+ }
+ }
+
+ if (existingEntry == null) {
+ // Create and add the entry if it doesn't exist
+ Map<String, String> destMap = new LinkedHashMap<>();
+ for (String ns : nss) {
+ destMap.put(ns, dest);
+ }
+ MountTable newEntry = MountTable.newInstance(mount, destMap);
+ if (readonly) {
+ newEntry.setReadOnly(true);
+ }
+ if (order != null) {
+ newEntry.setDestOrder(order);
+ }
+ AddMountTableEntryRequest request =
+ AddMountTableEntryRequest.newInstance(newEntry);
+ AddMountTableEntryResponse addResponse =
+ mountTable.addMountTableEntry(request);
+ boolean added = addResponse.getStatus();
+ if (!added) {
+ System.err.println("Cannot add mount point " + mount);
+ }
+ return added;
+ } else {
+ // Update the existing entry if it exists
+ for (String nsId : nss) {
+ if (!existingEntry.addDestination(nsId, dest)) {
+ System.err.println("Cannot add destination at " + nsId + " " + dest);
+ }
+ }
+ if (readonly) {
+ existingEntry.setReadOnly(true);
+ }
+ if (order != null) {
+ existingEntry.setDestOrder(order);
+ }
+ UpdateMountTableEntryRequest updateRequest =
+ UpdateMountTableEntryRequest.newInstance(existingEntry);
+ UpdateMountTableEntryResponse updateResponse =
+ mountTable.updateMountTableEntry(updateRequest);
+ boolean updated = updateResponse.getStatus();
+ if (!updated) {
+ System.err.println("Cannot update mount point " + mount);
+ }
+ return updated;
+ }
+ }
+
+ /**
+ * Remove mount point.
+ *
+ * @param path Path to remove.
+ * @throws IOException If it cannot be removed.
+ */
+ public boolean removeMount(String path) throws IOException {
+ MountTableManager mountTable = client.getMountTableManager();
+ RemoveMountTableEntryRequest request =
+ RemoveMountTableEntryRequest.newInstance(path);
+ RemoveMountTableEntryResponse response =
+ mountTable.removeMountTableEntry(request);
+ boolean removed = response.getStatus();
+ if (!removed) {
+ System.out.println("Cannot remove mount point " + path);
+ }
+ return removed;
+ }
+
+ /**
+ * List mount points.
+ *
+ * @param path Path to list.
+ * @throws IOException If it cannot be listed.
+ */
+ public void listMounts(String path) throws IOException {
+ MountTableManager mountTable = client.getMountTableManager();
+ GetMountTableEntriesRequest request =
+ GetMountTableEntriesRequest.newInstance(path);
+ GetMountTableEntriesResponse response =
+ mountTable.getMountTableEntries(request);
+ List<MountTable> entries = response.getEntries();
+ printMounts(entries);
+ }
+
+ private static void printMounts(List<MountTable> entries) {
+ System.out.println("Mount Table Entries:");
+ System.out.println(String.format(
+ "%-25s %-25s",
+ "Source", "Destinations"));
+ for (MountTable entry : entries) {
+ StringBuilder destBuilder = new StringBuilder();
+ for (RemoteLocation location : entry.getDestinations()) {
+ if (destBuilder.length() > 0) {
+ destBuilder.append(",");
+ }
+ destBuilder.append(String.format("%s->%s", location.getNameserviceId(),
+ location.getDest()));
+ }
+ System.out.println(String.format("%-25s %-25s", entry.getSourcePath(),
+ destBuilder.toString()));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/package-info.java
new file mode 100644
index 0000000..466c3d3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * It includes the tools to manage the Router-based federation. Includes the
+ * utilities to add and remove mount table entries.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.tools.federation;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/RouterProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/RouterProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/RouterProtocol.proto
new file mode 100644
index 0000000..3f43040
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/RouterProtocol.proto
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "RouterProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs.router;
+
+import "FederationProtocol.proto";
+
+service RouterAdminProtocolService {
+ /**
+ * Add a mount table entry.
+ */
+ rpc addMountTableEntry(AddMountTableEntryRequestProto) returns(AddMountTableEntryResponseProto);
+
+ /**
+ * Update an existing mount table entry without copying files.
+ */
+ rpc updateMountTableEntry(UpdateMountTableEntryRequestProto) returns(UpdateMountTableEntryResponseProto);
+
+ /**
+ * Remove a mount table entry.
+ */
+ rpc removeMountTableEntry(RemoveMountTableEntryRequestProto) returns(RemoveMountTableEntryResponseProto);
+
+ /**
+ * Get matching mount entries
+ */
+ rpc getMountTableEntries(GetMountTableEntriesRequestProto) returns(GetMountTableEntriesResponseProto);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 90eee0a..b38fa14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4725,6 +4725,44 @@
</description>
</property>
+ <property>
+ <name>dfs.federation.router.admin.enable</name>
+ <value>true</value>
+ <description>
+ If the RPC admin service to handle client requests in the router is
+ enabled.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.admin-address</name>
+ <value>0.0.0.0:8111</value>
+ <description>
+ RPC address that handles the admin requests.
+ The value of this property will take the form of router-host1:rpc-port.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.admin-bind-host</name>
+ <value></value>
+ <description>
+ The actual address the RPC admin server will bind to. If this optional
+ address is set, it overrides only the hostname portion of
+ dfs.federation.router.admin-address. This is useful for making the name
+ node listen on all interfaces by setting it to 0.0.0.0.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.admin.handler.count</name>
+ <value>1</value>
+ <description>
+ The number of server threads for the router to handle RPC requests from
+ admin.
+ </description>
+ </property>
+
<property>
<name>dfs.federation.router.file.resolver.client.class</name>
<value>org.apache.hadoop.hdfs.server.federation.MockResolver</value>
@@ -4742,6 +4780,14 @@
</property>
<property>
+ <name>dfs.federation.router.store.enable</name>
+ <value>true</value>
+ <description>
+ If the Router connects to the State Store.
+ </description>
+ </property>
+
+ <property>
<name>dfs.federation.router.store.serializer</name>
<value>org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index 21555c5..cac5e6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -28,8 +28,10 @@ public class RouterConfigBuilder {
private Configuration conf;
private boolean enableRpcServer = false;
+ private boolean enableAdminServer = false;
private boolean enableHeartbeat = false;
private boolean enableLocalHeartbeat = false;
+ private boolean enableStateStore = false;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
@@ -41,8 +43,10 @@ public class RouterConfigBuilder {
public RouterConfigBuilder all() {
this.enableRpcServer = true;
+ this.enableAdminServer = true;
this.enableHeartbeat = true;
this.enableLocalHeartbeat = true;
+ this.enableStateStore = true;
return this;
}
@@ -56,21 +60,43 @@ public class RouterConfigBuilder {
return this;
}
+ public RouterConfigBuilder admin(boolean enable) {
+ this.enableAdminServer = enable;
+ return this;
+ }
+
public RouterConfigBuilder heartbeat(boolean enable) {
this.enableHeartbeat = enable;
return this;
}
+ public RouterConfigBuilder stateStore(boolean enable) {
+ this.enableStateStore = enable;
+ return this;
+ }
+
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
+ public RouterConfigBuilder admin() {
+ return this.admin(true);
+ }
+
public RouterConfigBuilder heartbeat() {
return this.heartbeat(true);
}
+ public RouterConfigBuilder stateStore() {
+ return this.stateStore(true);
+ }
+
public Configuration build() {
+ conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
+ this.enableStateStore);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer);
+ conf.setBoolean(DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE,
+ this.enableAdminServer);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
this.enableHeartbeat);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
index 0830c19..1ee49d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
@@ -25,8 +25,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
@@ -46,6 +50,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
@@ -67,6 +72,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServi
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -101,6 +107,15 @@ public class RouterDFSCluster {
/** Mini cluster. */
private MiniDFSCluster cluster;
+ protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
+ TimeUnit.SECONDS.toMillis(5);
+ protected static final long DEFAULT_CACHE_INTERVAL_MS =
+ TimeUnit.SECONDS.toMillis(5);
+ /** Heartbeat interval in milliseconds. */
+ private long heartbeatInterval;
+ /** Cache flush interval in milliseconds. */
+ private long cacheFlushInterval;
+
/** Router configuration overrides. */
private Configuration routerOverrides;
/** Namenode configuration overrides. */
@@ -118,6 +133,7 @@ public class RouterDFSCluster {
private int rpcPort;
private DFSClient client;
private Configuration conf;
+ private RouterClient adminClient;
private URI fileSystemUri;
public RouterContext(Configuration conf, String nsId, String nnId)
@@ -183,6 +199,15 @@ public class RouterDFSCluster {
});
}
+ public RouterClient getAdminClient() throws IOException {
+ if (adminClient == null) {
+ InetSocketAddress routerSocket = router.getAdminServerAddress();
+ LOG.info("Connecting to router admin at {}", routerSocket);
+ adminClient = new RouterClient(routerSocket, conf);
+ }
+ return adminClient;
+ }
+
public DFSClient getClient() throws IOException, URISyntaxException {
if (client == null) {
LOG.info("Connecting to router at {}", fileSystemUri);
@@ -304,13 +329,22 @@ public class RouterDFSCluster {
}
}
- public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) {
+ public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes,
+ long heartbeatInterval, long cacheFlushInterval) {
this.highAvailability = ha;
+ this.heartbeatInterval = heartbeatInterval;
+ this.cacheFlushInterval = cacheFlushInterval;
configureNameservices(numNameservices, numNamenodes);
}
public RouterDFSCluster(boolean ha, int numNameservices) {
- this(ha, numNameservices, 2);
+ this(ha, numNameservices, 2,
+ DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
+ }
+
+ public RouterDFSCluster(boolean ha, int numNameservices, int numNamnodes) {
+ this(ha, numNameservices, numNamnodes,
+ DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
}
/**
@@ -404,7 +438,12 @@ public class RouterDFSCluster {
conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
+ conf.set(DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0");
+ conf.set(DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0");
+
conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0));
+ conf.setLong(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
+ conf.setLong(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, cacheFlushInterval);
// Use mock resolver classes
conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5673b7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
new file mode 100644
index 0000000..e42ab50
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation;
+
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+/**
+ * Test utility to mimic a federated HDFS cluster with a router and a state
+ * store.
+ */
+public class StateStoreDFSCluster extends RouterDFSCluster {
+
+ private static final Class<?> DEFAULT_FILE_RESOLVER =
+ MountTableResolver.class;
+ private static final Class<?> DEFAULT_NAMENODE_RESOLVER =
+ MembershipNamenodeResolver.class;
+
+ public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes,
+ long heartbeatInterval, long cacheFlushInterval)
+ throws IOException, InterruptedException {
+ this(ha, numNameservices, numNamenodes, heartbeatInterval,
+ cacheFlushInterval, DEFAULT_FILE_RESOLVER);
+ }
+
+ public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes,
+ long heartbeatInterval, long cacheFlushInterval, Class<?> fileResolver)
+ throws IOException, InterruptedException {
+ super(ha, numNameservices, numNamenodes, heartbeatInterval,
+ cacheFlushInterval);
+
+ // Attach state store and resolvers to router
+ Configuration stateStoreConfig = getStateStoreConfiguration();
+ // Use state store backed resolvers
+ stateStoreConfig.setClass(
+ DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+ DEFAULT_NAMENODE_RESOLVER, ActiveNamenodeResolver.class);
+ stateStoreConfig.setClass(
+ DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+ fileResolver, FileSubclusterResolver.class);
+ this.addRouterOverrides(stateStoreConfig);
+ }
+
+ public StateStoreDFSCluster(boolean ha, int numNameservices,
+ Class<?> fileResolver) throws IOException, InterruptedException {
+ this(ha, numNameservices, 2,
+ DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS, fileResolver);
+ }
+
+ public StateStoreDFSCluster(boolean ha, int numNameservices)
+ throws IOException, InterruptedException {
+ this(ha, numNameservices, 2,
+ DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
+ }
+
+ public StateStoreDFSCluster(boolean ha, int numNameservices,
+ int numNamnodes) throws IOException, InterruptedException {
+ this(ha, numNameservices, numNamnodes,
+ DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // State Store Test Fixtures
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Adds test fixtures for NN registation for each NN nameservice -> NS
+ * namenode -> NN rpcAddress -> 0.0.0.0:0 webAddress -> 0.0.0.0:0 state ->
+ * STANDBY safeMode -> false blockPool -> test.
+ *
+ * @param stateStore State Store.
+ * @throws IOException If it cannot register.
+ */
+ public void createTestRegistration(StateStoreService stateStore)
+ throws IOException {
+ List<MembershipState> entries = new ArrayList<MembershipState>();
+ for (NamenodeContext nn : this.getNamenodes()) {
+ MembershipState entry = createMockRegistrationForNamenode(
+ nn.getNameserviceId(), nn.getNamenodeId(),
+ FederationNamenodeServiceState.STANDBY);
+ entries.add(entry);
+ }
+ synchronizeRecords(
+ stateStore, entries, MembershipState.class);
+ }
+
+ public void createTestMountTable(StateStoreService stateStore)
+ throws IOException {
+ List<MountTable> mounts = generateMockMountTable();
+ synchronizeRecords(stateStore, mounts, MountTable.class);
+ stateStore.refreshCaches();
+ }
+
+ public List<MountTable> generateMockMountTable() throws IOException {
+ // create table entries
+ List<MountTable> entries = new ArrayList<>();
+ for (String ns : this.getNameservices()) {
+ Map<String, String> destMap = new HashMap<>();
+ destMap.put(ns, getNamenodePathForNS(ns));
+
+ // Direct path
+ String fedPath = getFederatedPathForNS(ns);
+ MountTable entry = MountTable.newInstance(fedPath, destMap);
+ entries.add(entry);
+ }
+
+ // Root path goes to nameservice 1
+ Map<String, String> destMap = new HashMap<>();
+ String ns0 = this.getNameservices().get(0);
+ destMap.put(ns0, "/");
+ MountTable entry = MountTable.newInstance("/", destMap);
+ entries.add(entry);
+ return entries;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org