You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/07/02 20:32:54 UTC
[37/45] hadoop git commit: HDFS-13536. [PROVIDED Storage] HA for
InMemoryAliasMap. Contributed by Virajith Jalaparti.
HDFS-13536. [PROVIDED Storage] HA for InMemoryAliasMap. Contributed by Virajith Jalaparti.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1804a315
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1804a315
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1804a315
Branch: refs/heads/HDDS-4
Commit: 1804a31515e541b3371925aa895589919b54d443
Parents: 5cc2541
Author: Inigo Goiri <in...@apache.org>
Authored: Mon Jul 2 10:48:20 2018 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Mon Jul 2 10:48:20 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 4 +-
.../hdfs/client/HdfsClientConfigKeys.java | 3 +
.../ha/ConfiguredFailoverProxyProvider.java | 9 +-
.../InMemoryAliasMapFailoverProxyProvider.java | 38 ++
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +-
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 37 +-
.../org/apache/hadoop/hdfs/NameNodeProxies.java | 15 +-
...yAliasMapProtocolClientSideTranslatorPB.java | 95 ++++-
.../aliasmap/InMemoryAliasMapProtocol.java | 5 +
.../aliasmap/InMemoryLevelDBAliasMapServer.java | 19 +-
.../impl/InMemoryLevelDBAliasMapClient.java | 80 ++--
.../src/main/resources/hdfs-default.xml | 22 +-
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 13 +-
.../apache/hadoop/hdfs/MiniDFSNNTopology.java | 2 +-
.../impl/TestInMemoryLevelDBAliasMapClient.java | 7 +
.../namenode/ITestProvidedImplementation.java | 371 ++++++++++++++++---
16 files changed, 615 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 313b973..3fac7c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -396,7 +396,7 @@ public class DFSUtilClient {
* @param keys Set of keys to look for in the order of preference
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
- static Map<String, Map<String, InetSocketAddress>> getAddresses(
+ public static Map<String, Map<String, InetSocketAddress>> getAddresses(
Configuration conf, String defaultAddress, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
@@ -426,7 +426,7 @@ public class DFSUtilClient {
return ret;
}
- static Map<String, InetSocketAddress> getAddressesForNameserviceId(
+ public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue, String... keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index f2cec31..a812670 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -184,6 +184,9 @@ public interface HdfsClientConfigKeys {
"dfs.namenode.snapshot.capture.openfiles";
boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = false;
+ String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS =
+ "dfs.provided.aliasmap.inmemory.dnrpc-address";
+
/**
* These are deprecated config keys to client code.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
index 96722fc..f46532a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+
/**
* A FailoverProxyProvider implementation which allows one to configure
* multiple URIs to connect to during fail-over. A random configured address is
@@ -60,6 +62,11 @@ public class ConfiguredFailoverProxyProvider<T> extends
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory) {
+ this(conf, uri, xface, factory, DFS_NAMENODE_RPC_ADDRESS_KEY);
+ }
+
+ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
+ Class<T> xface, HAProxyFactory<T> factory, String addressKey) {
this.xface = xface;
this.conf = new Configuration(conf);
int maxRetries = this.conf.getInt(
@@ -81,7 +88,7 @@ public class ConfiguredFailoverProxyProvider<T> extends
ugi = UserGroupInformation.getCurrentUser();
Map<String, Map<String, InetSocketAddress>> map =
- DFSUtilClient.getHaNnRpcAddresses(conf);
+ DFSUtilClient.getAddresses(conf, null, addressKey);
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
if (addressesInNN == null || addressesInNN.size() == 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java
new file mode 100644
index 0000000..6525942
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.net.URI;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
+
+/**
+ * A {@link ConfiguredFailoverProxyProvider} implementation used to connect
+ * to an InMemoryAliasMap.
+ */
+public class InMemoryAliasMapFailoverProxyProvider<T>
+ extends ConfiguredFailoverProxyProvider<T> {
+
+ public InMemoryAliasMapFailoverProxyProvider(
+ Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
+ super(conf, uri, xface, factory,
+ DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index dde7eb7..cc902b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -86,8 +86,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105";
public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
- public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS = "dfs.provided.aliasmap.inmemory.dnrpc-address";
+ public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS =
+ HdfsClientConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT = "0.0.0.0:50200";
+ public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST = "dfs.provided.aliasmap.inmemory.rpc.bind-host";
+
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR = "dfs.provided.aliasmap.inmemory.leveldb.dir";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE = "dfs.provided.aliasmap.inmemory.batch-size";
public static final int DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT = 500;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 4c94e38..f7cd32b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -1130,7 +1130,42 @@ public class DFSUtil {
return getSuffixIDs(conf, addressKey, null, nnId, LOCAL_ADDRESS_MATCHER)[0];
}
-
+
+ /**
+ * Determine the {@link InetSocketAddress} to bind to, for any service.
+ * In case of HA or federation, the address is assumed to specified as
+ * {@code confKey}.NAMESPACEID.NAMENODEID as appropriate.
+ *
+ * @param conf configuration.
+ * @param confKey configuration key (prefix if HA/federation) used to
+ * specify the address for the service.
+ * @param defaultValue default value for the address.
+ * @param bindHostKey configuration key (prefix if HA/federation)
+ * specifying host to bind to.
+ * @return the address to bind to.
+ */
+ public static InetSocketAddress getBindAddress(Configuration conf,
+ String confKey, String defaultValue, String bindHostKey) {
+ InetSocketAddress address;
+ String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+ String bindHostActualKey;
+ if (nsId != null) {
+ String namenodeId = HAUtil.getNameNodeId(conf, nsId);
+ address = DFSUtilClient.getAddressesForNameserviceId(
+ conf, nsId, null, confKey).get(namenodeId);
+ bindHostActualKey = DFSUtil.addKeySuffixes(bindHostKey, nsId, namenodeId);
+ } else {
+ address = NetUtils.createSocketAddr(conf.get(confKey, defaultValue));
+ bindHostActualKey = bindHostKey;
+ }
+
+ String bindHost = conf.get(bindHostActualKey);
+ if (bindHost == null || bindHost.isEmpty()) {
+ bindHost = address.getHostName();
+ }
+ return new InetSocketAddress(bindHost, address.getPort());
+ }
+
/**
* Returns nameservice Id and namenode Id when the local host matches the
* configuration parameter {@code addressKey}.<nameservice Id>.<namenode Id>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index d556c90..b63d26b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -31,10 +31,13 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
@@ -184,6 +187,8 @@ public class NameNodeProxies {
conf, ugi);
} else if (xface == RefreshCallQueueProtocol.class) {
proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
+ } else if (xface == InMemoryAliasMapProtocol.class) {
+ proxy = (T) createNNProxyWithInMemoryAliasMapProtocol(nnAddr, conf, ugi);
} else {
String message = "Unsupported protocol found when creating the proxy " +
"connection to NameNode: " +
@@ -194,7 +199,15 @@ public class NameNodeProxies {
return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
}
-
+
+ private static InMemoryAliasMapProtocol createNNProxyWithInMemoryAliasMapProtocol(
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
+ throws IOException {
+ AliasMapProtocolPB proxy = (AliasMapProtocolPB) createNameNodeProxy(
+ address, conf, ugi, AliasMapProtocolPB.class, 30000);
+ return new InMemoryAliasMapProtocolClientSideTranslatorPB(proxy);
+ }
+
private static JournalProtocol createNNProxyWithJournalProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
index fc23c88..2025c16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
@@ -20,27 +20,38 @@ import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
+import org.apache.hadoop.hdfs.server.namenode.ha.InMemoryAliasMapFailoverProxyProvider;
import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import java.io.Closeable;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSUtil.addKeySuffixes;
+import static org.apache.hadoop.hdfs.DFSUtil.createUri;
+import static org.apache.hadoop.hdfs.DFSUtilClient.getNameServiceIds;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX;
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*;
@@ -52,7 +63,7 @@ import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InMemoryAliasMapProtocolClientSideTranslatorPB
- implements InMemoryAliasMapProtocol {
+ implements InMemoryAliasMapProtocol, Closeable {
private static final Logger LOG =
LoggerFactory
@@ -60,22 +71,61 @@ public class InMemoryAliasMapProtocolClientSideTranslatorPB
private AliasMapProtocolPB rpcProxy;
- public InMemoryAliasMapProtocolClientSideTranslatorPB(Configuration conf) {
- String addr = conf.getTrimmed(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
- DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT);
- InetSocketAddress aliasMapAddr = NetUtils.createSocketAddr(addr);
+ public InMemoryAliasMapProtocolClientSideTranslatorPB(
+ AliasMapProtocolPB rpcProxy) {
+ this.rpcProxy = rpcProxy;
+ }
- RPC.setProtocolEngine(conf, AliasMapProtocolPB.class,
- ProtobufRpcEngine.class);
- LOG.info("Connecting to address: " + addr);
- try {
- rpcProxy = RPC.getProxy(AliasMapProtocolPB.class,
- RPC.getProtocolVersion(AliasMapProtocolPB.class), aliasMapAddr, null,
- conf, NetUtils.getDefaultSocketFactory(conf), 0);
- } catch (IOException e) {
- throw new RuntimeException(
- "Error in connecting to " + addr + " Got: " + e);
+ public static Collection<InMemoryAliasMapProtocol> init(Configuration conf) {
+ Collection<InMemoryAliasMapProtocol> aliasMaps = new ArrayList<>();
+ // Try to connect to all configured nameservices as it is not known which
+ // nameservice supports the AliasMap.
+ for (String nsId : getNameServiceIds(conf)) {
+ try {
+ URI namenodeURI = null;
+ Configuration newConf = new Configuration(conf);
+ if (HAUtil.isHAEnabled(conf, nsId)) {
+ // set the failover-proxy provider if HA is enabled.
+ newConf.setClass(
+ addKeySuffixes(PROXY_PROVIDER_KEY_PREFIX, nsId),
+ InMemoryAliasMapFailoverProxyProvider.class,
+ AbstractNNFailoverProxyProvider.class);
+ namenodeURI = new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId);
+ } else {
+ String key =
+ addKeySuffixes(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, nsId);
+ String addr = conf.get(key);
+ if (addr != null) {
+ namenodeURI = createUri(HdfsConstants.HDFS_URI_SCHEME,
+ NetUtils.createSocketAddr(addr));
+ }
+ }
+ if (namenodeURI != null) {
+ aliasMaps.add(NameNodeProxies
+ .createProxy(newConf, namenodeURI, InMemoryAliasMapProtocol.class)
+ .getProxy());
+ LOG.info("Connected to InMemoryAliasMap at {}", namenodeURI);
+ }
+ } catch (IOException | URISyntaxException e) {
+ LOG.warn("Exception in connecting to InMemoryAliasMap for nameservice "
+ + "{}: {}", nsId, e);
+ }
}
+ // if a separate AliasMap is configured using
+ // DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, try to connect it.
+ if (conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS) != null) {
+ URI uri = createUri("hdfs", NetUtils.createSocketAddr(
+ conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS)));
+ try {
+ aliasMaps.add(NameNodeProxies
+ .createProxy(conf, uri, InMemoryAliasMapProtocol.class).getProxy());
+ LOG.info("Connected to InMemoryAliasMap at {}", uri);
+ } catch (IOException e) {
+ LOG.warn("Exception in connecting to InMemoryAliasMap at {}: {}", uri,
+ e);
+ }
+ }
+ return aliasMaps;
}
@Override
@@ -168,7 +218,12 @@ public class InMemoryAliasMapProtocolClientSideTranslatorPB
}
}
- public void stop() {
- RPC.stopProxy(rpcProxy);
+ @Override
+ public void close() throws IOException {
+ LOG.info("Stopping rpcProxy in" +
+ "InMemoryAliasMapProtocolClientSideTranslatorPB");
+ if (rpcProxy != null) {
+ RPC.stopProxy(rpcProxy);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
index 89f590c..c3824e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.io.retry.Idempotent;
import javax.annotation.Nonnull;
import java.io.IOException;
@@ -69,6 +70,7 @@ public interface InMemoryAliasMapProtocol {
* FileRegions and the next marker.
* @throws IOException
*/
+ @Idempotent
InMemoryAliasMap.IterationResult list(Optional<Block> marker)
throws IOException;
@@ -80,6 +82,7 @@ public interface InMemoryAliasMapProtocol {
* @throws IOException
*/
@Nonnull
+ @Idempotent
Optional<ProvidedStorageLocation> read(@Nonnull Block block)
throws IOException;
@@ -90,6 +93,7 @@ public interface InMemoryAliasMapProtocol {
* @param providedStorageLocation
* @throws IOException
*/
+ @Idempotent
void write(@Nonnull Block block,
@Nonnull ProvidedStorageLocation providedStorageLocation)
throws IOException;
@@ -99,5 +103,6 @@ public interface InMemoryAliasMapProtocol {
* @return the block pool id associated with the Namenode running
* the in-memory alias map.
*/
+ @Idempotent
String getBlockPoolId() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
index 4edc9a2..1d06f13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolPB;
@@ -34,9 +33,13 @@ import org.apache.hadoop.ipc.RPC;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.Optional;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST;
+import static org.apache.hadoop.hdfs.DFSUtil.getBindAddress;
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.CheckedFunction2;
@@ -79,18 +82,16 @@ public class InMemoryLevelDBAliasMapServer implements InMemoryAliasMapProtocol,
AliasMapProtocolService
.newReflectiveBlockingService(aliasMapProtocolXlator);
- String rpcAddress =
- conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
- DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT);
- String[] split = rpcAddress.split(":");
- String bindHost = split[0];
- Integer port = Integer.valueOf(split[1]);
+ InetSocketAddress rpcAddress = getBindAddress(conf,
+ DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
+ DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT,
+ DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST);
aliasMapServer = new RPC.Builder(conf)
.setProtocol(AliasMapProtocolPB.class)
.setInstance(aliasMapProtocolService)
- .setBindAddress(bindHost)
- .setPort(port)
+ .setBindAddress(rpcAddress.getHostName())
+ .setPort(rpcAddress.getPort())
.setNumHandlers(1)
.setVerbose(true)
.build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
index d389184..fb5ee93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
@@ -24,11 +24,17 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
+import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -44,17 +50,28 @@ import java.util.Optional;
public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
implements Configurable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(InMemoryLevelDBAliasMapClient.class);
private Configuration conf;
- private InMemoryAliasMapProtocolClientSideTranslatorPB aliasMap;
- private String blockPoolID;
+ private Collection<InMemoryAliasMapProtocol> aliasMaps;
@Override
public void close() {
- aliasMap.stop();
+ if (aliasMaps != null) {
+ for (InMemoryAliasMapProtocol aliasMap : aliasMaps) {
+ RPC.stopProxy(aliasMap);
+ }
+ }
}
class LevelDbReader extends BlockAliasMap.Reader<FileRegion> {
+ private InMemoryAliasMapProtocol aliasMap;
+
+ LevelDbReader(InMemoryAliasMapProtocol aliasMap) {
+ this.aliasMap = aliasMap;
+ }
+
@Override
public Optional<FileRegion> resolve(Block block) throws IOException {
Optional<ProvidedStorageLocation> read = aliasMap.read(block);
@@ -114,6 +131,13 @@ public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
}
class LevelDbWriter extends BlockAliasMap.Writer<FileRegion> {
+
+ private InMemoryAliasMapProtocol aliasMap;
+
+ LevelDbWriter(InMemoryAliasMapProtocol aliasMap) {
+ this.aliasMap = aliasMap;
+ }
+
@Override
public void store(FileRegion fileRegion) throws IOException {
aliasMap.write(fileRegion.getBlock(),
@@ -130,40 +154,53 @@ public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
throw new UnsupportedOperationException("Unable to start "
+ "InMemoryLevelDBAliasMapClient as security is enabled");
}
+ aliasMaps = new ArrayList<>();
}
-
- @Override
- public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
+ private InMemoryAliasMapProtocol getAliasMap(String blockPoolID)
throws IOException {
- if (this.blockPoolID == null) {
- this.blockPoolID = aliasMap.getBlockPoolId();
+ if (blockPoolID == null) {
+ throw new IOException("Block pool id required to get aliasmap reader");
}
// if a block pool id has been supplied, and doesn't match the associated
- // block pool id, return null.
- if (blockPoolID != null && this.blockPoolID != null
- && !this.blockPoolID.equals(blockPoolID)) {
- return null;
+ // block pool ids, return null.
+ for (InMemoryAliasMapProtocol aliasMap : aliasMaps) {
+ try {
+ String aliasMapBlockPoolId = aliasMap.getBlockPoolId();
+ if (aliasMapBlockPoolId != null &&
+ aliasMapBlockPoolId.equals(blockPoolID)) {
+ return aliasMap;
+ }
+ } catch (IOException e) {
+ LOG.error("Exception in retrieving block pool id {}", e);
+ }
}
- return new LevelDbReader();
+ throw new IOException(
+ "Unable to retrive InMemoryAliasMap for block pool id " + blockPoolID);
+ }
+
+ @Override
+ public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
+ throws IOException {
+ InMemoryAliasMapProtocol aliasMap = getAliasMap(blockPoolID);
+ LOG.info("Loading InMemoryAliasMapReader for block pool id {}",
+ blockPoolID);
+ return new LevelDbReader(aliasMap);
}
@Override
public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolID)
throws IOException {
- if (this.blockPoolID == null) {
- this.blockPoolID = aliasMap.getBlockPoolId();
- }
- if (blockPoolID != null && !this.blockPoolID.equals(blockPoolID)) {
- return null;
- }
- return new LevelDbWriter();
+ InMemoryAliasMapProtocol aliasMap = getAliasMap(blockPoolID);
+ LOG.info("Loading InMemoryAliasMapWriter for block pool id {}",
+ blockPoolID);
+ return new LevelDbWriter(aliasMap);
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
- this.aliasMap = new InMemoryAliasMapProtocolClientSideTranslatorPB(conf);
+ aliasMaps = InMemoryAliasMapProtocolClientSideTranslatorPB.init(conf);
}
@Override
@@ -174,5 +211,4 @@ public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
@Override
public void refresh() throws IOException {
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 146ae6c..6dd2d92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4817,9 +4817,27 @@
<property>
<name>dfs.provided.aliasmap.inmemory.dnrpc-address</name>
- <value>0.0.0.0:50200</value>
+ <value></value>
+ <description>
+ The address where the aliasmap server will be running. In the case of
+ HA/Federation where multiple namenodes exist, and if the Namenode is
+ configured to run the aliasmap server
+ (dfs.provided.aliasmap.inmemory.enabled is set to true),
+ the name service id is added to the name, e.g.,
+ dfs.provided.aliasmap.inmemory.rpc.address.EXAMPLENAMESERVICE.
+ The value of this property will take the form of host:rpc-port.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.provided.aliasmap.inmemory.rpc.bind-host</name>
+ <value></value>
<description>
- The address where the aliasmap server will be running
+ The actual address the in-memory aliasmap server will bind to.
+ If this optional address is set, it overrides the hostname portion of
+ dfs.provided.aliasmap.inmemory.rpc.address.
+ This is useful for making the name node listen on all interfaces by
+ setting it to 0.0.0.0.
</description>
</property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index c2e2a68..a2e5951 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1187,7 +1187,7 @@ public class MiniDFSCluster implements AutoCloseable {
}
- private void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId,
+ protected void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId,
boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, int nnIndex)
throws IOException {
if (nameserviceId != null) {
@@ -1379,6 +1379,17 @@ public class MiniDFSCluster implements AutoCloseable {
return null;
}
+ public List<Integer> getNNIndexes(String nameserviceId) {
+ int count = 0;
+ List<Integer> nnIndexes = new ArrayList<>();
+ for (NameNodeInfo nn : namenodes.values()) {
+ if (nn.getNameserviceId().equals(nameserviceId)) {
+ nnIndexes.add(count);
+ }
+ count++;
+ }
+ return nnIndexes;
+ }
/**
* wait for the given namenode to get out of safemode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
index b9786a3..c21ff80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
@@ -227,7 +227,7 @@ public class MiniDFSNNTopology {
this.nnId = nnId;
}
- String getNnId() {
+ public String getNnId() {
return nnId;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
index 61a1558..f062633 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
@@ -32,6 +32,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -341,4 +342,10 @@ public class TestInMemoryLevelDBAliasMapClient {
assertThat(actualFileRegions).containsExactlyInAnyOrder(
expectedFileRegions.toArray(new FileRegion[0]));
}
+
+ @Test
+ public void testServerBindHost() throws Exception {
+ conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, "0.0.0.0");
+ writeRead();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java
index 7d3ab0e..e3f4dec 100644
--- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java
+++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java
@@ -24,12 +24,18 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
-import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -44,14 +50,14 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
-import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@@ -60,8 +66,10 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.LevelDBFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -71,6 +79,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
import org.junit.After;
import org.junit.Before;
@@ -80,6 +90,12 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LEVELDB_PATH;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.fileNameFromBlockPoolID;
import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR;
import static org.junit.Assert.*;
@@ -106,6 +122,7 @@ public class ITestProvidedImplementation {
private final int baseFileLen = 1024;
private long providedDataSize = 0;
private final String bpid = "BP-1234-10.1.1.1-1224";
+ private static final String clusterID = "CID-PROVIDED";
private Configuration conf;
private MiniDFSCluster cluster;
@@ -214,36 +231,78 @@ public class ITestProvidedImplementation {
StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode,
boolean doFormat, String[] racks) throws IOException {
+ startCluster(nspath, numDatanodes,
+ storageTypes, storageTypesPerDatanode,
+ doFormat, racks, null,
+ new MiniDFSCluster.Builder(conf));
+ }
+
+ void startCluster(Path nspath, int numDatanodes,
+ StorageType[] storageTypes,
+ StorageType[][] storageTypesPerDatanode,
+ boolean doFormat, String[] racks,
+ MiniDFSNNTopology topo,
+ MiniDFSCluster.Builder builder) throws IOException {
conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
+ builder.format(doFormat)
+ .manageNameDfsDirs(doFormat)
+ .numDataNodes(numDatanodes)
+ .racks(racks);
if (storageTypesPerDatanode != null) {
- cluster = new MiniDFSCluster.Builder(conf)
- .format(doFormat)
- .manageNameDfsDirs(doFormat)
- .numDataNodes(numDatanodes)
- .storageTypes(storageTypesPerDatanode)
- .racks(racks)
- .build();
+ builder.storageTypes(storageTypesPerDatanode);
} else if (storageTypes != null) {
- cluster = new MiniDFSCluster.Builder(conf)
- .format(doFormat)
- .manageNameDfsDirs(doFormat)
- .numDataNodes(numDatanodes)
- .storagesPerDatanode(storageTypes.length)
- .storageTypes(storageTypes)
- .racks(racks)
- .build();
- } else {
- cluster = new MiniDFSCluster.Builder(conf)
- .format(doFormat)
- .manageNameDfsDirs(doFormat)
- .numDataNodes(numDatanodes)
- .racks(racks)
- .build();
+ builder.storagesPerDatanode(storageTypes.length)
+ .storageTypes(storageTypes);
}
+ if (topo != null) {
+ builder.nnTopology(topo);
+ // If HA or Federation is enabled and formatting is set to false,
+ // copy the FSImage to all Namenode directories.
+ if ((topo.isHA() || topo.isFederated()) && !doFormat) {
+ builder.manageNameDfsDirs(true);
+ builder.enableManagedDfsDirsRedundancy(false);
+ builder.manageNameDfsSharedDirs(false);
+ List<File> nnDirs =
+ getProvidedNamenodeDirs(MiniDFSCluster.getBaseDirectory(), topo);
+ for (File nnDir : nnDirs) {
+ MiniDFSCluster.copyNameDirs(
+ Collections.singletonList(nspath.toUri()),
+ Collections.singletonList(fileAsURI(nnDir)),
+ conf);
+ }
+ }
+ }
+ cluster = builder.build();
cluster.waitActive();
}
+ private static List<File> getProvidedNamenodeDirs(String baseDir,
+ MiniDFSNNTopology topo) {
+ List<File> nnDirs = new ArrayList<>();
+ int nsCounter = 0;
+ for (MiniDFSNNTopology.NSConf nsConf : topo.getNameservices()) {
+ int nnCounter = nsCounter;
+ for (MiniDFSNNTopology.NNConf nnConf : nsConf.getNNs()) {
+ if (providedNameservice.equals(nsConf.getId())) {
+ // only add the first one
+ File[] nnFiles =
+ MiniDFSCluster.getNameNodeDirectory(
+ baseDir, nsCounter, nnCounter);
+ if (nnFiles == null || nnFiles.length == 0) {
+ throw new RuntimeException("Failed to get a location for the"
+ + "Namenode directory for namespace: " + nsConf.getId()
+ + " and namenodeId: " + nnConf.getNnId());
+ }
+ nnDirs.add(nnFiles[0]);
+ }
+ nnCounter++;
+ }
+ nsCounter = nnCounter;
+ }
+ return nnDirs;
+ }
+
@Test(timeout=20000)
public void testLoadImage() throws Exception {
final long seed = r.nextLong();
@@ -405,8 +464,8 @@ public class ITestProvidedImplementation {
return ret;
}
- private void verifyFileSystemContents() throws Exception {
- FileSystem fs = cluster.getFileSystem();
+ private void verifyFileSystemContents(int nnIndex) throws Exception {
+ FileSystem fs = cluster.getFileSystem(nnIndex);
int count = 0;
// read NN metadata, verify contents match
for (TreePath e : new FSTreeWalk(providedPath, conf)) {
@@ -766,41 +825,255 @@ public class ITestProvidedImplementation {
}
}
-
- @Test
- public void testInMemoryAliasMap() throws Exception {
- conf.setClass(ImageWriter.Options.UGI_CLASS,
- FsUGIResolver.class, UGIResolver.class);
+ private File createInMemoryAliasMapImage() throws Exception {
+ conf.setClass(ImageWriter.Options.UGI_CLASS, FsUGIResolver.class,
+ UGIResolver.class);
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
InMemoryLevelDBAliasMapClient.class, BlockAliasMap.class);
- conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
- "localhost:32445");
+ conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, "localhost:32445");
File tempDirectory =
- Files.createTempDirectory("in-memory-alias-map").toFile();
- File leveDBPath = new File(tempDirectory, bpid);
- leveDBPath.mkdirs();
- conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
+ new File(new Path(nnDirPath, "in-memory-alias-map").toUri());
+ File levelDBDir = new File(tempDirectory, bpid);
+ levelDBDir.mkdirs();
+ conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDirectory.getAbsolutePath());
- conf.setBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
- InMemoryLevelDBAliasMapServer levelDBAliasMapServer =
- new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init, bpid);
- levelDBAliasMapServer.setConf(conf);
- levelDBAliasMapServer.start();
+ conf.set(DFS_PROVIDED_ALIASMAP_LEVELDB_PATH,
+ tempDirectory.getAbsolutePath());
createImage(new FSTreeWalk(providedPath, conf),
nnDirPath,
- FixedBlockResolver.class, "",
- InMemoryLevelDBAliasMapClient.class);
- levelDBAliasMapServer.close();
+ FixedBlockResolver.class, clusterID,
+ LevelDBFileRegionAliasMap.class);
+
+ return tempDirectory;
+ }
+ @Test
+ public void testInMemoryAliasMap() throws Exception {
+ File aliasMapImage = createInMemoryAliasMapImage();
// start cluster with two datanodes,
// each with 1 PROVIDED volume and other DISK volume
+ conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
+ conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
startCluster(nnDirPath, 2,
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
null, false);
- verifyFileSystemContents();
- FileUtils.deleteDirectory(tempDirectory);
+ verifyFileSystemContents(0);
+ FileUtils.deleteDirectory(aliasMapImage);
+ }
+
+ /**
+ * Find a free port that hasn't been assigned yet.
+ *
+ * @param usedPorts set of ports that have already been assigned.
+ * @param maxTrials maximum number of random ports to try before failure.
+ * @return an unassigned port.
+ */
+ private int getUnAssignedPort(Set<Integer> usedPorts, int maxTrials) {
+ int count = 0;
+ while (count < maxTrials) {
+ int port = NetUtils.getFreeSocketPort();
+ if (usedPorts.contains(port)) {
+ count++;
+ } else {
+ return port;
+ }
+ }
+ return -1;
+ }
+
+ private static String providedNameservice;
+
+ /**
+ * Extends the {@link MiniDFSCluster.Builder} to create instances of
+ * {@link MiniDFSClusterBuilderAliasMap}.
+ */
+ private static class MiniDFSClusterBuilderAliasMap
+ extends MiniDFSCluster.Builder {
+
+ MiniDFSClusterBuilderAliasMap(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public MiniDFSCluster build() throws IOException {
+ return new MiniDFSClusterAliasMap(this);
+ }
+ }
+
+ /**
+ * Extends {@link MiniDFSCluster} to correctly configure the InMemoryAliasMap.
+ */
+ private static class MiniDFSClusterAliasMap extends MiniDFSCluster {
+
+ private Map<String, Collection<URI>> formattedDirsByNamespaceId;
+ private Set<Integer> completedNNs;
+
+ MiniDFSClusterAliasMap(MiniDFSCluster.Builder builder) throws IOException {
+ super(builder);
+ }
+
+ @Override
+ protected void initNameNodeConf(Configuration conf, String nameserviceId,
+ int nsIndex, String nnId, boolean manageNameDfsDirs,
+ boolean enableManagedDfsDirsRedundancy, int nnIndex)
+ throws IOException {
+
+ if (formattedDirsByNamespaceId == null) {
+ formattedDirsByNamespaceId = new HashMap<>();
+ completedNNs = new HashSet<>();
+ }
+
+ super.initNameNodeConf(conf, nameserviceId, nsIndex, nnId,
+ manageNameDfsDirs, enableManagedDfsDirsRedundancy, nnIndex);
+
+ if (providedNameservice.equals(nameserviceId)) {
+ // configure the InMemoryAliasMp.
+ conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
+ String directory = conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR);
+ if (directory == null) {
+ throw new IllegalArgumentException("In-memory alias map configured"
+ + "with the proper location; Set "
+ + DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR);
+ }
+ // get the name of the directory (final component in path) used for map.
+ // Assume that the aliasmap configured with the same final component
+ // name in all Namenodes but is located in the path specified by
+ // DFS_NAMENODE_NAME_DIR_KEY
+ String dirName = new Path(directory).getName();
+ String nnDir =
+ conf.getTrimmedStringCollection(DFS_NAMENODE_NAME_DIR_KEY)
+ .iterator().next();
+ conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
+ new File(new Path(nnDir, dirName).toUri()).getAbsolutePath());
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true);
+ } else {
+ if (!completedNNs.contains(nnIndex)) {
+ // format the NN directories for non-provided namespaces
+ // if the directory for a namespace has been formatted, copy it over.
+ Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
+ if (formattedDirsByNamespaceId.containsKey(nameserviceId)) {
+ copyNameDirs(formattedDirsByNamespaceId.get(nameserviceId),
+ namespaceDirs, conf);
+ } else {
+ for (URI nameDirUri : namespaceDirs) {
+ File nameDir = new File(nameDirUri);
+ if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
+ throw new IOException("Could not fully delete " + nameDir);
+ }
+ }
+ HdfsServerConstants.StartupOption.FORMAT.setClusterId(clusterID);
+ DFSTestUtil.formatNameNode(conf);
+ formattedDirsByNamespaceId.put(nameserviceId, namespaceDirs);
+ }
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, false);
+ completedNNs.add(nnIndex);
+ }
+ }
+ }
+ }
+
+ /**
+ * Configures the addresseses of the InMemoryAliasMap.
+ *
+ * @param topology the MiniDFS topology to use.
+ * @param providedNameservice the nameservice id that supports provided.
+ */
+ private void configureAliasMapAddresses(MiniDFSNNTopology topology,
+ String providedNameservice) {
+ conf.unset(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS);
+ Set<Integer> assignedPorts = new HashSet<>();
+ for (MiniDFSNNTopology.NSConf nsConf : topology.getNameservices()) {
+ for (MiniDFSNNTopology.NNConf nnConf : nsConf.getNNs()) {
+ if (providedNameservice.equals(nsConf.getId())) {
+ String key =
+ DFSUtil.addKeySuffixes(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
+ nsConf.getId(), nnConf.getNnId());
+ int port = getUnAssignedPort(assignedPorts, 10);
+ if (port == -1) {
+ throw new RuntimeException("No free ports available");
+ }
+ assignedPorts.add(port);
+ conf.set(key, "127.0.0.1:" + port);
+
+ String binHostKey =
+ DFSUtil.addKeySuffixes(
+ DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST,
+ nsConf.getId(), nnConf.getNnId());
+ conf.set(binHostKey, "0.0.0.0");
+ }
+ }
+ }
+ }
+
+ /**
+ * Verify the mounted contents of the Filesystem.
+ *
+ * @param topology the topology of the cluster.
+ * @param providedNameservice the namespace id of the provided namenodes.
+ * @throws Exception
+ */
+ private void verifyPathsWithHAFailoverIfNecessary(MiniDFSNNTopology topology,
+ String providedNameservice) throws Exception {
+ List<Integer> nnIndexes = cluster.getNNIndexes(providedNameservice);
+ if (topology.isHA()) {
+ int nn1 = nnIndexes.get(0);
+ int nn2 = nnIndexes.get(1);
+ try {
+ verifyFileSystemContents(nn1);
+ fail("Read operation should fail as no Namenode is active");
+ } catch (RemoteException e) {
+ LOG.info("verifyPaths failed!. Expected exception: {}" + e);
+ }
+ cluster.transitionToActive(nn1);
+ LOG.info("Verifying data from NN with index = {}", nn1);
+ verifyFileSystemContents(nn1);
+ // transition to the second namenode.
+ cluster.transitionToStandby(nn1);
+ cluster.transitionToActive(nn2);
+ LOG.info("Verifying data from NN with index = {}", nn2);
+ verifyFileSystemContents(nn2);
+
+ cluster.shutdownNameNodes();
+ try {
+ verifyFileSystemContents(nn2);
+ fail("Read operation should fail as no Namenode is active");
+ } catch (NullPointerException e) {
+ LOG.info("verifyPaths failed!. Expected exception: {}" + e);
+ }
+ } else {
+ verifyFileSystemContents(nnIndexes.get(0));
+ }
+ }
+
+ @Test
+ public void testInMemoryAliasMapMultiTopologies() throws Exception {
+ MiniDFSNNTopology[] topologies =
+ new MiniDFSNNTopology[] {
+ MiniDFSNNTopology.simpleHATopology(),
+ MiniDFSNNTopology.simpleFederatedTopology(3),
+ MiniDFSNNTopology.simpleHAFederatedTopology(3)
+ };
+
+ for (MiniDFSNNTopology topology : topologies) {
+ LOG.info("Starting test with topology with HA = {}, federation = {}",
+ topology.isHA(), topology.isFederated());
+ setSeed();
+ createInMemoryAliasMapImage();
+ conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
+ conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
+ providedNameservice = topology.getNameservices().get(0).getId();
+ // configure the AliasMap addresses
+ configureAliasMapAddresses(topology, providedNameservice);
+ startCluster(nnDirPath, 2,
+ new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
+ null, false, null, topology,
+ new MiniDFSClusterBuilderAliasMap(conf));
+
+ verifyPathsWithHAFailoverIfNecessary(topology, providedNameservice);
+ shutdown();
+ }
}
private DatanodeDescriptor getDatanodeDescriptor(DatanodeManager dnm,
@@ -919,7 +1192,7 @@ public class ITestProvidedImplementation {
startCluster(nnDirPath, racks.length,
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
null, false, racks);
- verifyFileSystemContents();
+ verifyFileSystemContents(0);
setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
cluster.shutdown();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org