You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/08 20:58:04 UTC
[32/45] hadoop git commit: HDFS-11546. Federation Router RPC server.
Contributed by Jason Kace and Inigo Goiri.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
index ee6f57d..2875750 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.util.Time;
/**
* In-memory cache/mock of a namenode and file resolver. Stores the most
- * recently updated NN information for each nameservice and block pool. Also
+ * recently updated NN information for each nameservice and block pool. It also
* stores a virtual mount table for resolving global namespace paths to local NN
* paths.
*/
@@ -51,82 +51,93 @@ public class MockResolver
implements ActiveNamenodeResolver, FileSubclusterResolver {
private Map<String, List<? extends FederationNamenodeContext>> resolver =
- new HashMap<String, List<? extends FederationNamenodeContext>>();
- private Map<String, List<RemoteLocation>> locations =
- new HashMap<String, List<RemoteLocation>>();
- private Set<FederationNamespaceInfo> namespaces =
- new HashSet<FederationNamespaceInfo>();
+ new HashMap<>();
+ private Map<String, List<RemoteLocation>> locations = new HashMap<>();
+ private Set<FederationNamespaceInfo> namespaces = new HashSet<>();
private String defaultNamespace = null;
+
public MockResolver(Configuration conf, StateStoreService store) {
this.cleanRegistrations();
}
- public void addLocation(String mount, String nameservice, String location) {
- RemoteLocation remoteLocation = new RemoteLocation(nameservice, location);
- List<RemoteLocation> locationsList = locations.get(mount);
+ public void addLocation(String mount, String nsId, String location) {
+ List<RemoteLocation> locationsList = this.locations.get(mount);
if (locationsList == null) {
- locationsList = new LinkedList<RemoteLocation>();
- locations.put(mount, locationsList);
+ locationsList = new LinkedList<>();
+ this.locations.put(mount, locationsList);
}
+
+ final RemoteLocation remoteLocation = new RemoteLocation(nsId, location);
if (!locationsList.contains(remoteLocation)) {
locationsList.add(remoteLocation);
}
if (this.defaultNamespace == null) {
- this.defaultNamespace = nameservice;
+ this.defaultNamespace = nsId;
}
}
public synchronized void cleanRegistrations() {
- this.resolver =
- new HashMap<String, List<? extends FederationNamenodeContext>>();
- this.namespaces = new HashSet<FederationNamespaceInfo>();
+ this.resolver = new HashMap<>();
+ this.namespaces = new HashSet<>();
}
@Override
public void updateActiveNamenode(
- String ns, InetSocketAddress successfulAddress) {
+ String nsId, InetSocketAddress successfulAddress) {
String address = successfulAddress.getHostName() + ":" +
successfulAddress.getPort();
- String key = ns;
+ String key = nsId;
if (key != null) {
// Update the active entry
@SuppressWarnings("unchecked")
- List<FederationNamenodeContext> iterator =
- (List<FederationNamenodeContext>) resolver.get(key);
- for (FederationNamenodeContext namenode : iterator) {
+ List<FederationNamenodeContext> namenodes =
+ (List<FederationNamenodeContext>) this.resolver.get(key);
+ for (FederationNamenodeContext namenode : namenodes) {
if (namenode.getRpcAddress().equals(address)) {
MockNamenodeContext nn = (MockNamenodeContext) namenode;
nn.setState(FederationNamenodeServiceState.ACTIVE);
break;
}
}
- Collections.sort(iterator, new NamenodePriorityComparator());
+ // This operation modifies the list so we need to be careful
+ synchronized(namenodes) {
+ Collections.sort(namenodes, new NamenodePriorityComparator());
+ }
}
}
@Override
public List<? extends FederationNamenodeContext>
getNamenodesForNameserviceId(String nameserviceId) {
- return resolver.get(nameserviceId);
+ // Return a copy of the list because it is updated periodically
+ List<? extends FederationNamenodeContext> namenodes =
+ this.resolver.get(nameserviceId);
+ return Collections.unmodifiableList(new ArrayList<>(namenodes));
}
@Override
public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
String blockPoolId) {
- return resolver.get(blockPoolId);
+ // Return a copy of the list because it is updated periodically
+ List<? extends FederationNamenodeContext> namenodes =
+ this.resolver.get(blockPoolId);
+ return Collections.unmodifiableList(new ArrayList<>(namenodes));
}
private static class MockNamenodeContext
implements FederationNamenodeContext {
+
+ private String namenodeId;
+ private String nameserviceId;
+
private String webAddress;
private String rpcAddress;
private String serviceAddress;
private String lifelineAddress;
- private String namenodeId;
- private String nameserviceId;
+
private FederationNamenodeServiceState state;
private long dateModified;
@@ -197,6 +208,7 @@ public class MockResolver
@Override
public synchronized boolean registerNamenode(NamenodeStatusReport report)
throws IOException {
+
MockNamenodeContext context = new MockNamenodeContext(
report.getRpcAddress(), report.getServiceAddress(),
report.getLifelineAddress(), report.getWebAddress(),
@@ -205,13 +217,14 @@ public class MockResolver
String nsId = report.getNameserviceId();
String bpId = report.getBlockPoolId();
String cId = report.getClusterId();
+
@SuppressWarnings("unchecked")
List<MockNamenodeContext> existingItems =
- (List<MockNamenodeContext>) resolver.get(nsId);
+ (List<MockNamenodeContext>) this.resolver.get(nsId);
if (existingItems == null) {
- existingItems = new ArrayList<MockNamenodeContext>();
- resolver.put(bpId, existingItems);
- resolver.put(nsId, existingItems);
+ existingItems = new ArrayList<>();
+ this.resolver.put(bpId, existingItems);
+ this.resolver.put(nsId, existingItems);
}
boolean added = false;
for (int i=0; i<existingItems.size() && !added; i++) {
@@ -227,7 +240,7 @@ public class MockResolver
Collections.sort(existingItems, new NamenodePriorityComparator());
FederationNamespaceInfo info = new FederationNamespaceInfo(bpId, cId, nsId);
- namespaces.add(info);
+ this.namespaces.add(info);
return true;
}
@@ -238,16 +251,13 @@ public class MockResolver
@Override
public PathLocation getDestinationForPath(String path) throws IOException {
- String finalPath = null;
- String nameservice = null;
- Set<String> namespaceSet = new HashSet<String>();
- LinkedList<RemoteLocation> remoteLocations =
- new LinkedList<RemoteLocation>();
- for(String key : this.locations.keySet()) {
- if(path.startsWith(key)) {
+ Set<String> namespaceSet = new HashSet<>();
+ List<RemoteLocation> remoteLocations = new LinkedList<>();
+ for (String key : this.locations.keySet()) {
+ if (path.startsWith(key)) {
for (RemoteLocation location : this.locations.get(key)) {
- finalPath = location.getDest() + path.substring(key.length());
- nameservice = location.getNameserviceId();
+ String finalPath = location.getDest() + path.substring(key.length());
+ String nameservice = location.getNameserviceId();
RemoteLocation remoteLocation =
new RemoteLocation(nameservice, finalPath);
remoteLocations.add(remoteLocation);
@@ -265,7 +275,7 @@ public class MockResolver
@Override
public List<String> getMountPoints(String path) throws IOException {
- List<String> mounts = new ArrayList<String>();
+ List<String> mounts = new ArrayList<>();
if (path.equals("/")) {
// Mounts only supported under root level
for (String mount : this.locations.keySet()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index 16d624c..39fcf7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.federation;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
/**
* Constructs a router configuration with individual features enabled/disabled.
@@ -26,15 +27,32 @@ public class RouterConfigBuilder {
private Configuration conf;
+ private boolean enableRpcServer = false;
+
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
}
public RouterConfigBuilder() {
- this.conf = new Configuration();
+ this.conf = new Configuration(false);
+ }
+
+ public RouterConfigBuilder all() {
+ this.enableRpcServer = true;
+ return this;
+ }
+
+ public RouterConfigBuilder rpc(boolean enable) {
+ this.enableRpcServer = enable;
+ return this;
+ }
+
+ public RouterConfigBuilder rpc() {
+ return this.rpc(true);
}
public Configuration build() {
+ conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer);
return conf;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
index 55d04ad..4031b7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
@@ -17,27 +17,44 @@
*/
package org.apache.hadoop.hdfs.server.federation;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.waitNamenodeRegistered;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Random;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -46,16 +63,49 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Test utility to mimic a federated HDFS cluster with a router.
+ * Test utility to mimic a federated HDFS cluster with multiple routers.
*/
public class RouterDFSCluster {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterDFSCluster.class);
+
+ public static final String TEST_STRING = "teststring";
+ public static final String TEST_DIR = "testdir";
+ public static final String TEST_FILE = "testfile";
+
+
+ /** Nameservices in the federated cluster. */
+ private List<String> nameservices;
+ /** Namenodes in the federated cluster. */
+ private List<NamenodeContext> namenodes;
+ /** Routers in the federated cluster. */
+ private List<RouterContext> routers;
+ /** If the Namenodes are in high availability.*/
+ private boolean highAvailability;
+
+ /** Mini cluster. */
+ private MiniDFSCluster cluster;
+
+ /** Router configuration overrides. */
+ private Configuration routerOverrides;
+ /** Namenode configuration overrides. */
+ private Configuration namenodeOverrides;
+
+
/**
* Router context.
*/
@@ -69,13 +119,14 @@ public class RouterDFSCluster {
private Configuration conf;
private URI fileSystemUri;
- public RouterContext(Configuration conf, String ns, String nn)
+ public RouterContext(Configuration conf, String nsId, String nnId)
throws URISyntaxException {
- this.namenodeId = nn;
- this.nameserviceId = ns;
this.conf = conf;
- router = new Router();
- router.init(conf);
+ this.nameserviceId = nsId;
+ this.namenodeId = nnId;
+
+ this.router = new Router();
+ this.router.init(conf);
}
public Router getRouter() {
@@ -99,18 +150,30 @@ public class RouterDFSCluster {
}
public void initRouter() throws URISyntaxException {
+ // Store the bound points for the router interfaces
+ InetSocketAddress rpcAddress = router.getRpcServerAddress();
+ if (rpcAddress != null) {
+ this.rpcPort = rpcAddress.getPort();
+ this.fileSystemUri =
+ URI.create("hdfs://" + NetUtils.getHostPortString(rpcAddress));
+ // Override the default FS to point to the router RPC
+ DistributedFileSystem.setDefaultUri(conf, fileSystemUri);
+ try {
+ this.fileContext = FileContext.getFileContext(conf);
+ } catch (UnsupportedFileSystemException e) {
+ this.fileContext = null;
+ }
+ }
}
- public DistributedFileSystem getFileSystem() throws IOException {
- DistributedFileSystem fs =
- (DistributedFileSystem) DistributedFileSystem.get(conf);
- return fs;
+ public FileSystem getFileSystem() throws IOException {
+ return DistributedFileSystem.get(conf);
}
public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {
- LOG.info("Connecting to router at " + fileSystemUri);
+ LOG.info("Connecting to router at {}", fileSystemUri);
return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
@Override
public DFSClient run() throws IOException {
@@ -120,9 +183,8 @@ public class RouterDFSCluster {
}
public DFSClient getClient() throws IOException, URISyntaxException {
-
if (client == null) {
- LOG.info("Connecting to router at " + fileSystemUri);
+ LOG.info("Connecting to router at {}", fileSystemUri);
client = new DFSClient(fileSystemUri, conf);
}
return client;
@@ -130,9 +192,10 @@ public class RouterDFSCluster {
}
/**
- * Namenode context.
+ * Namenode context in the federated cluster.
*/
public class NamenodeContext {
+ private Configuration conf;
private NameNode namenode;
private String nameserviceId;
private String namenodeId;
@@ -143,14 +206,13 @@ public class RouterDFSCluster {
private int httpPort;
private URI fileSystemUri;
private int index;
- private Configuration conf;
private DFSClient client;
- public NamenodeContext(Configuration conf, String ns, String nn,
- int index) {
+ public NamenodeContext(
+ Configuration conf, String nsId, String nnId, int index) {
this.conf = conf;
- this.namenodeId = nn;
- this.nameserviceId = ns;
+ this.nameserviceId = nsId;
+ this.namenodeId = nnId;
this.index = index;
}
@@ -170,20 +232,19 @@ public class RouterDFSCluster {
return this.fileContext;
}
- public void setNamenode(NameNode n) throws URISyntaxException {
- namenode = n;
+ public void setNamenode(NameNode nn) throws URISyntaxException {
+ this.namenode = nn;
- // Store the bound ports and override the default FS with the local NN's
- // RPC
- rpcPort = n.getNameNodeAddress().getPort();
- servicePort = n.getServiceRpcAddress().getPort();
- lifelinePort = n.getServiceRpcAddress().getPort();
- httpPort = n.getHttpAddress().getPort();
- fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort());
- DistributedFileSystem.setDefaultUri(conf, fileSystemUri);
+ // Store the bound ports and override the default FS with the local NN RPC
+ this.rpcPort = nn.getNameNodeAddress().getPort();
+ this.servicePort = nn.getServiceRpcAddress().getPort();
+ this.lifelinePort = nn.getServiceRpcAddress().getPort();
+ this.httpPort = nn.getHttpAddress().getPort();
+ this.fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort());
+ DistributedFileSystem.setDefaultUri(this.conf, this.fileSystemUri);
try {
- this.fileContext = FileContext.getFileContext(conf);
+ this.fileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException e) {
this.fileContext = null;
}
@@ -205,10 +266,8 @@ public class RouterDFSCluster {
return namenode.getHttpAddress().getHostName() + ":" + httpPort;
}
- public DistributedFileSystem getFileSystem() throws IOException {
- DistributedFileSystem fs =
- (DistributedFileSystem) DistributedFileSystem.get(conf);
- return fs;
+ public FileSystem getFileSystem() throws IOException {
+ return DistributedFileSystem.get(conf);
}
public void resetClient() {
@@ -218,7 +277,7 @@ public class RouterDFSCluster {
public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {
- LOG.info("Connecting to namenode at " + fileSystemUri);
+ LOG.info("Connecting to namenode at {}", fileSystemUri);
return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
@Override
public DFSClient run() throws IOException {
@@ -229,7 +288,7 @@ public class RouterDFSCluster {
public DFSClient getClient() throws IOException, URISyntaxException {
if (client == null) {
- LOG.info("Connecting to namenode at " + fileSystemUri);
+ LOG.info("Connecting to namenode at {}", fileSystemUri);
client = new DFSClient(fileSystemUri, conf);
}
return client;
@@ -244,36 +303,20 @@ public class RouterDFSCluster {
}
}
- public static final String NAMENODE1 = "nn0";
- public static final String NAMENODE2 = "nn1";
- public static final String NAMENODE3 = "nn2";
- public static final String TEST_STRING = "teststring";
- public static final String TEST_DIR = "testdir";
- public static final String TEST_FILE = "testfile";
-
- private List<String> nameservices;
- private List<RouterContext> routers;
- private List<NamenodeContext> namenodes;
- private static final Log LOG = LogFactory.getLog(RouterDFSCluster.class);
- private MiniDFSCluster cluster;
- private boolean highAvailability;
-
- protected static final int DEFAULT_HEARTBEAT_INTERVAL = 5;
- protected static final int DEFAULT_CACHE_INTERVAL_SEC = 5;
- private Configuration routerOverrides;
- private Configuration namenodeOverrides;
-
- private static final String NAMENODES = NAMENODE1 + "," + NAMENODE2;
-
- public RouterDFSCluster(boolean ha, int numNameservices) {
- this(ha, numNameservices, 2);
- }
-
public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) {
this.highAvailability = ha;
configureNameservices(numNameservices, numNamenodes);
}
+ public RouterDFSCluster(boolean ha, int numNameservices) {
+ this(ha, numNameservices, 2);
+ }
+
+ /**
+ * Add configuration settings to override default Router settings.
+ *
+ * @param conf Router configuration overrides.
+ */
public void addRouterOverrides(Configuration conf) {
if (this.routerOverrides == null) {
this.routerOverrides = conf;
@@ -282,6 +325,11 @@ public class RouterDFSCluster {
}
}
+ /**
+ * Add configuration settings to override default Namenode settings.
+ *
+ * @param conf Namenode configuration overrides.
+ */
public void addNamenodeOverrides(Configuration conf) {
if (this.namenodeOverrides == null) {
this.namenodeOverrides = conf;
@@ -290,124 +338,134 @@ public class RouterDFSCluster {
}
}
- public Configuration generateNamenodeConfiguration(
- String defaultNameserviceId) {
- Configuration c = new HdfsConfiguration();
+ /**
+ * Generate the configuration for a client.
+ *
+ * @param nsId Nameservice identifier.
+ * @return New namenode configuration.
+ */
+ public Configuration generateNamenodeConfiguration(String nsId) {
+ Configuration conf = new HdfsConfiguration();
- c.set(DFSConfigKeys.DFS_NAMESERVICES, getNameservicesKey());
- c.set("fs.defaultFS", "hdfs://" + defaultNameserviceId);
+ conf.set(DFS_NAMESERVICES, getNameservicesKey());
+ conf.set(FS_DEFAULT_NAME_KEY, "hdfs://" + nsId);
for (String ns : nameservices) {
if (highAvailability) {
- c.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, NAMENODES);
+ conf.set(
+ DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
+ NAMENODES[0] + "," + NAMENODES[1]);
}
for (NamenodeContext context : getNamenodes(ns)) {
String suffix = context.getConfSuffix();
- c.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
+ conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.rpcPort);
- c.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
+ conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.httpPort);
- c.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
+ conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
"0.0.0.0");
}
}
- if (namenodeOverrides != null) {
- c.addResource(namenodeOverrides);
+ if (this.namenodeOverrides != null) {
+ conf.addResource(this.namenodeOverrides);
}
- return c;
+ return conf;
}
+ /**
+ * Generate the configuration for a client.
+ *
+ * @return New configuration for a client.
+ */
public Configuration generateClientConfiguration() {
- Configuration conf = new HdfsConfiguration();
- conf.addResource(generateNamenodeConfiguration(getNameservices().get(0)));
+ Configuration conf = new HdfsConfiguration(false);
+ String ns0 = getNameservices().get(0);
+ conf.addResource(generateNamenodeConfiguration(ns0));
return conf;
}
- public Configuration generateRouterConfiguration(String localNameserviceId,
- String localNamenodeId) throws IOException {
- Configuration conf = new HdfsConfiguration();
- conf.addResource(generateNamenodeConfiguration(localNameserviceId));
+ /**
+ * Generate the configuration for a Router.
+ *
+ * @param nsId Nameservice identifier.
+ * @param nnId Namenode identifier.
+ * @return New configuration for a Router.
+ */
+ public Configuration generateRouterConfiguration(String nsId, String nnId) {
+
+ Configuration conf = new HdfsConfiguration(false);
+ conf.addResource(generateNamenodeConfiguration(nsId));
+
+ conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, 10);
+ conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
+ conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
+
+ conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0));
// Use mock resolver classes
- conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
- MockResolver.class.getCanonicalName());
- conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
- MockResolver.class.getCanonicalName());
+ conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+ MockResolver.class, ActiveNamenodeResolver.class);
+ conf.setClass(FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+ MockResolver.class, FileSubclusterResolver.class);
// Set the nameservice ID for the default NN monitor
- conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, localNameserviceId);
-
- if (localNamenodeId != null) {
- conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, localNamenodeId);
+ conf.set(DFS_NAMESERVICE_ID, nsId);
+ if (nnId != null) {
+ conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
}
- StringBuilder routerBuilder = new StringBuilder();
- for (String ns : nameservices) {
- for (NamenodeContext context : getNamenodes(ns)) {
- String suffix = context.getConfSuffix();
-
- if (routerBuilder.length() != 0) {
- routerBuilder.append(",");
- }
- routerBuilder.append(suffix);
+ // Add custom overrides if available
+ if (this.routerOverrides != null) {
+ for (Entry<String, String> entry : this.routerOverrides) {
+ String confKey = entry.getKey();
+ String confValue = entry.getValue();
+ conf.set(confKey, confValue);
}
}
-
return conf;
}
public void configureNameservices(int numNameservices, int numNamenodes) {
- nameservices = new ArrayList<String>();
- for (int i = 0; i < numNameservices; i++) {
- nameservices.add("ns" + i);
- }
- namenodes = new ArrayList<NamenodeContext>();
- int index = 0;
- for (String ns : nameservices) {
- Configuration nnConf = generateNamenodeConfiguration(ns);
- if (highAvailability) {
- NamenodeContext context =
- new NamenodeContext(nnConf, ns, NAMENODE1, index);
- namenodes.add(context);
- index++;
-
- if (numNamenodes > 1) {
- context = new NamenodeContext(nnConf, ns, NAMENODE2, index + 1);
- namenodes.add(context);
- index++;
- }
+ this.nameservices = new ArrayList<>();
+ this.namenodes = new ArrayList<>();
- if (numNamenodes > 2) {
- context = new NamenodeContext(nnConf, ns, NAMENODE3, index + 1);
- namenodes.add(context);
- index++;
- }
+ NamenodeContext context = null;
+ int nnIndex = 0;
+ for (int i=0; i<numNameservices; i++) {
+ String ns = "ns" + i;
+ this.nameservices.add("ns" + i);
+ Configuration nnConf = generateNamenodeConfiguration(ns);
+ if (!highAvailability) {
+ context = new NamenodeContext(nnConf, ns, null, nnIndex++);
+ this.namenodes.add(context);
} else {
- NamenodeContext context = new NamenodeContext(nnConf, ns, null, index);
- namenodes.add(context);
- index++;
+ for (int j=0; j<numNamenodes; j++) {
+ context = new NamenodeContext(nnConf, ns, NAMENODES[j], nnIndex++);
+ this.namenodes.add(context);
+ }
}
}
}
public String getNameservicesKey() {
- StringBuilder ns = new StringBuilder();
- for (int i = 0; i < nameservices.size(); i++) {
- if (i > 0) {
- ns.append(",");
+ StringBuilder sb = new StringBuilder();
+ for (String nsId : this.nameservices) {
+ if (sb.length() > 0) {
+ sb.append(",");
}
- ns.append(nameservices.get(i));
+ sb.append(nsId);
}
- return ns.toString();
+ return sb.toString();
}
public String getRandomNameservice() {
Random r = new Random();
- return nameservices.get(r.nextInt(nameservices.size()));
+ int randIndex = r.nextInt(nameservices.size());
+ return nameservices.get(randIndex);
}
public List<String> getNameservices() {
@@ -415,7 +473,7 @@ public class RouterDFSCluster {
}
public List<NamenodeContext> getNamenodes(String nameservice) {
- ArrayList<NamenodeContext> nns = new ArrayList<NamenodeContext>();
+ List<NamenodeContext> nns = new ArrayList<>();
for (NamenodeContext c : namenodes) {
if (c.nameserviceId.equals(nameservice)) {
nns.add(c);
@@ -426,23 +484,23 @@ public class RouterDFSCluster {
public NamenodeContext getRandomNamenode() {
Random rand = new Random();
- return namenodes.get(rand.nextInt(namenodes.size()));
+ int i = rand.nextInt(this.namenodes.size());
+ return this.namenodes.get(i);
}
public List<NamenodeContext> getNamenodes() {
- return namenodes;
+ return this.namenodes;
}
public boolean isHighAvailability() {
return highAvailability;
}
- public NamenodeContext getNamenode(String nameservice,
- String namenode) {
- for (NamenodeContext c : namenodes) {
+ public NamenodeContext getNamenode(String nameservice, String namenode) {
+ for (NamenodeContext c : this.namenodes) {
if (c.nameserviceId.equals(nameservice)) {
- if (namenode == null || c.namenodeId == null || namenode.isEmpty()
- || c.namenodeId.isEmpty()) {
+ if (namenode == null || namenode.isEmpty() ||
+ c.namenodeId == null || c.namenodeId.isEmpty()) {
return c;
} else if (c.namenodeId.equals(namenode)) {
return c;
@@ -453,7 +511,7 @@ public class RouterDFSCluster {
}
public List<RouterContext> getRouters(String nameservice) {
- ArrayList<RouterContext> nns = new ArrayList<RouterContext>();
+ List<RouterContext> nns = new ArrayList<>();
for (RouterContext c : routers) {
if (c.nameserviceId.equals(nameservice)) {
nns.add(c);
@@ -462,14 +520,13 @@ public class RouterDFSCluster {
return nns;
}
- public RouterContext getRouterContext(String nameservice,
- String namenode) {
+ public RouterContext getRouterContext(String nsId, String nnId) {
for (RouterContext c : routers) {
- if (namenode == null) {
+ if (nnId == null) {
return c;
}
- if (c.namenodeId.equals(namenode)
- && c.nameserviceId.equals(nameservice)) {
+ if (c.namenodeId.equals(nnId) &&
+ c.nameserviceId.equals(nsId)) {
return c;
}
}
@@ -485,10 +542,10 @@ public class RouterDFSCluster {
return routers;
}
- public RouterContext buildRouter(String nameservice, String namenode)
+ public RouterContext buildRouter(String nsId, String nnId)
throws URISyntaxException, IOException {
- Configuration config = generateRouterConfiguration(nameservice, namenode);
- RouterContext rc = new RouterContext(config, nameservice, namenode);
+ Configuration config = generateRouterConfiguration(nsId, nnId);
+ RouterContext rc = new RouterContext(config, nsId, nnId);
return rc;
}
@@ -500,10 +557,9 @@ public class RouterDFSCluster {
try {
MiniDFSNNTopology topology = new MiniDFSNNTopology();
for (String ns : nameservices) {
-
NSConf conf = new MiniDFSNNTopology.NSConf(ns);
if (highAvailability) {
- for(int i = 0; i < namenodes.size()/nameservices.size(); i++) {
+ for (int i=0; i<namenodes.size()/nameservices.size(); i++) {
NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i);
conf.addNN(nnConf);
}
@@ -516,11 +572,15 @@ public class RouterDFSCluster {
topology.setFederation(true);
// Start mini DFS cluster
- Configuration nnConf = generateNamenodeConfiguration(nameservices.get(0));
+ String ns0 = nameservices.get(0);
+ Configuration nnConf = generateNamenodeConfiguration(ns0);
if (overrideConf != null) {
nnConf.addResource(overrideConf);
}
- cluster = new MiniDFSCluster.Builder(nnConf).nnTopology(topology).build();
+ cluster = new MiniDFSCluster.Builder(nnConf)
+ .numDataNodes(nameservices.size()*2)
+ .nnTopology(topology)
+ .build();
cluster.waitActive();
// Store NN pointers
@@ -530,28 +590,32 @@ public class RouterDFSCluster {
}
} catch (Exception e) {
- LOG.error("Cannot start Router DFS cluster: " + e.getMessage(), e);
- cluster.shutdown();
+ LOG.error("Cannot start Router DFS cluster: {}", e.getMessage(), e);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
}
public void startRouters()
throws InterruptedException, URISyntaxException, IOException {
- // Create routers
- routers = new ArrayList<RouterContext>();
- for (String ns : nameservices) {
+ // Create one router per nameservice
+ this.routers = new ArrayList<>();
+ for (String ns : this.nameservices) {
for (NamenodeContext context : getNamenodes(ns)) {
- routers.add(buildRouter(ns, context.namenodeId));
+ RouterContext router = buildRouter(ns, context.namenodeId);
+ this.routers.add(router);
}
}
// Start all routers
- for (RouterContext router : routers) {
+ for (RouterContext router : this.routers) {
router.router.start();
}
+
// Wait until all routers are active and record their ports
- for (RouterContext router : routers) {
+ for (RouterContext router : this.routers) {
waitActive(router);
router.initRouter();
}
@@ -570,22 +634,21 @@ public class RouterDFSCluster {
}
Thread.sleep(1000);
}
- assertFalse(
- "Timeout waiting for " + router.router.toString() + " to activate.",
- true);
+ fail("Timeout waiting for " + router.router + " to activate");
}
-
public void registerNamenodes() throws IOException {
- for (RouterContext r : routers) {
+ for (RouterContext r : this.routers) {
ActiveNamenodeResolver resolver = r.router.getNamenodeResolver();
- for (NamenodeContext nn : namenodes) {
+ for (NamenodeContext nn : this.namenodes) {
// Generate a report
- NamenodeStatusReport report = new NamenodeStatusReport(nn.nameserviceId,
- nn.namenodeId, nn.getRpcAddress(), nn.getServiceAddress(),
+ NamenodeStatusReport report = new NamenodeStatusReport(
+ nn.nameserviceId, nn.namenodeId,
+ nn.getRpcAddress(), nn.getServiceAddress(),
nn.getLifelineAddress(), nn.getHttpAddress());
- report.setNamespaceInfo(nn.namenode.getNamesystem().getFSImage()
- .getStorage().getNamespaceInfo());
+ FSImage fsImage = nn.namenode.getNamesystem().getFSImage();
+ NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo();
+ report.setNamespaceInfo(nsInfo);
// Determine HA state from nn public state string
String nnState = nn.namenode.getState();
@@ -606,74 +669,97 @@ public class RouterDFSCluster {
public void waitNamenodeRegistration()
throws InterruptedException, IllegalStateException, IOException {
- for (RouterContext r : routers) {
- for (NamenodeContext nn : namenodes) {
- FederationTestUtils.waitNamenodeRegistered(
- r.router.getNamenodeResolver(), nn.nameserviceId, nn.namenodeId,
- null);
+ for (RouterContext r : this.routers) {
+ Router router = r.router;
+ for (NamenodeContext nn : this.namenodes) {
+ ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
+ waitNamenodeRegistered(
+ nnResolver, nn.nameserviceId, nn.namenodeId, null);
}
}
}
public void waitRouterRegistrationQuorum(RouterContext router,
- FederationNamenodeServiceState state, String nameservice, String namenode)
+ FederationNamenodeServiceState state, String nsId, String nnId)
throws InterruptedException, IOException {
- LOG.info("Waiting for NN - " + nameservice + ":" + namenode
- + " to transition to state - " + state);
- FederationTestUtils.waitNamenodeRegistered(
- router.router.getNamenodeResolver(), nameservice, namenode, state);
+ LOG.info("Waiting for NN {} {} to transition to {}", nsId, nnId, state);
+ ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver();
+ waitNamenodeRegistered(nnResolver, nsId, nnId, state);
}
- public String getFederatedPathForNameservice(String ns) {
- return "/" + ns;
+ /**
+ * Get the federated path for a nameservice.
+ * @param nsId Nameservice identifier.
+ * @return Path in the Router.
+ */
+ public String getFederatedPathForNS(String nsId) {
+ return "/" + nsId;
}
- public String getNamenodePathForNameservice(String ns) {
- return "/target-" + ns;
+ /**
+ * Get the namenode path for a nameservice.
+ * @param nsId Nameservice identifier.
+ * @return Path in the Namenode.
+ */
+ public String getNamenodePathForNS(String nsId) {
+ return "/target-" + nsId;
}
/**
- * @return example:
+ * Get the federated test directory for a nameservice.
+ * @param nsId Nameservice identifier.
+ * @return Example:
* <ul>
* <li>/ns0/testdir which maps to ns0->/target-ns0/testdir
* </ul>
*/
- public String getFederatedTestDirectoryForNameservice(String ns) {
- return getFederatedPathForNameservice(ns) + "/" + TEST_DIR;
+ public String getFederatedTestDirectoryForNS(String nsId) {
+ return getFederatedPathForNS(nsId) + "/" + TEST_DIR;
}
/**
+ * Get the namenode test directory for a nameservice.
+ * @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/target-ns0/testdir
* </ul>
*/
- public String getNamenodeTestDirectoryForNameservice(String ns) {
- return getNamenodePathForNameservice(ns) + "/" + TEST_DIR;
+ public String getNamenodeTestDirectoryForNS(String nsId) {
+ return getNamenodePathForNS(nsId) + "/" + TEST_DIR;
}
/**
+ * Get the federated test file for a nameservice.
+ * @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/ns0/testfile which maps to ns0->/target-ns0/testfile
* </ul>
*/
- public String getFederatedTestFileForNameservice(String ns) {
- return getFederatedPathForNameservice(ns) + "/" + TEST_FILE;
+ public String getFederatedTestFileForNS(String nsId) {
+ return getFederatedPathForNS(nsId) + "/" + TEST_FILE;
}
/**
+ * Get the namenode test file for a nameservice.
+ * @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/target-ns0/testfile
* </ul>
*/
- public String getNamenodeTestFileForNameservice(String ns) {
- return getNamenodePathForNameservice(ns) + "/" + TEST_FILE;
+ public String getNamenodeTestFileForNS(String nsId) {
+ return getNamenodePathForNS(nsId) + "/" + TEST_FILE;
}
+ /**
+ * Stop the federated HDFS cluster.
+ */
public void shutdown() {
- cluster.shutdown();
+ if (cluster != null) {
+ cluster.shutdown();
+ }
if (routers != null) {
for (RouterContext context : routers) {
stopRouter(context);
@@ -681,9 +767,12 @@ public class RouterDFSCluster {
}
}
+ /**
+ * Stop a router.
+ * @param router Router context.
+ */
public void stopRouter(RouterContext router) {
try {
-
router.router.shutDown();
int loopCount = 0;
@@ -691,7 +780,7 @@ public class RouterDFSCluster {
loopCount++;
Thread.sleep(1000);
if (loopCount > 20) {
- LOG.error("Unable to shutdown router - " + router.rpcPort);
+ LOG.error("Cannot shutdown router {}", router.rpcPort);
break;
}
}
@@ -714,26 +803,28 @@ public class RouterDFSCluster {
for (String ns : getNameservices()) {
NamenodeContext context = getNamenode(ns, null);
if (!createTestDirectoriesNamenode(context)) {
- throw new IOException("Unable to create test directory for ns - " + ns);
+ throw new IOException("Cannot create test directory for ns " + ns);
}
}
}
public boolean createTestDirectoriesNamenode(NamenodeContext nn)
throws IOException {
- return FederationTestUtils.addDirectory(nn.getFileSystem(),
- getNamenodeTestDirectoryForNameservice(nn.nameserviceId));
+ FileSystem fs = nn.getFileSystem();
+ String testDir = getNamenodeTestDirectoryForNS(nn.nameserviceId);
+ return addDirectory(fs, testDir);
}
public void deleteAllFiles() throws IOException {
// Delete all files via the NNs and verify
for (NamenodeContext context : getNamenodes()) {
- FileStatus[] status = context.getFileSystem().listStatus(new Path("/"));
- for(int i = 0; i <status.length; i++) {
+ FileSystem fs = context.getFileSystem();
+ FileStatus[] status = fs.listStatus(new Path("/"));
+ for (int i = 0; i <status.length; i++) {
Path p = status[i].getPath();
- context.getFileSystem().delete(p, true);
+ fs.delete(p, true);
}
- status = context.getFileSystem().listStatus(new Path("/"));
+ status = fs.listStatus(new Path("/"));
assertEquals(status.length, 0);
}
}
@@ -754,14 +845,34 @@ public class RouterDFSCluster {
MockResolver resolver =
(MockResolver) r.router.getSubclusterResolver();
// create table entries
- for (String ns : nameservices) {
+ for (String nsId : nameservices) {
// Direct path
- resolver.addLocation(getFederatedPathForNameservice(ns), ns,
- getNamenodePathForNameservice(ns));
+ String routerPath = getFederatedPathForNS(nsId);
+ String nnPath = getNamenodePathForNS(nsId);
+ resolver.addLocation(routerPath, nsId, nnPath);
}
- // Root path goes to both NS1
- resolver.addLocation("/", nameservices.get(0), "/");
+ // Root path points to both first nameservice
+ String ns0 = nameservices.get(0);
+ resolver.addLocation("/", ns0, "/");
+ }
+ }
+
+ public MiniDFSCluster getCluster() {
+ return cluster;
+ }
+
+ /**
+ * Wait until the federated cluster is up and ready.
+ * @throws IOException If we cannot wait for the cluster to be up.
+ */
+ public void waitClusterUp() throws IOException {
+ cluster.waitClusterUp();
+ registerNamenodes();
+ try {
+ waitNamenodeRegistration();
+ } catch (Exception e) {
+ throw new IOException("Cannot wait for the namenodes", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
index 8c720c7..d8afb39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -51,6 +52,10 @@ public class TestRouter {
conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
MockResolver.class.getCanonicalName());
+ // Bind to any available port
+ conf.set(DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
+ conf.set(DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
+
// Simulate a co-located NN
conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0");
conf.set("fs.defaultFS", "hdfs://" + "ns0");
@@ -90,7 +95,31 @@ public class TestRouter {
@Test
public void testRouterService() throws InterruptedException, IOException {
+ // Rpc only
+ testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
+
// Run with all services
- testRouterStartup((new RouterConfigBuilder(conf)).build());
+ testRouterStartup(new RouterConfigBuilder(conf).all().build());
+ }
+
+ @Test
+ public void testRouterRestartRpcService() throws IOException {
+
+ // Start
+ Router router = new Router();
+ router.init(new RouterConfigBuilder(conf).rpc().build());
+ router.start();
+
+ // Verify RPC server is running
+ assertNotNull(router.getRpcServerAddress());
+ RouterRpcServer rpcServer = router.getRpcServer();
+ assertNotNull(rpcServer);
+ assertEquals(STATE.STARTED, rpcServer.getServiceState());
+
+ // Stop router and RPC server
+ router.stop();
+ assertEquals(STATE.STOPPED, rpcServer.getServiceState());
+
+ router.close();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
new file mode 100644
index 0000000..af506c9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -0,0 +1,869 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.countContents;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.deleteFile;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileStatus;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
+import static org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.TEST_STRING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service.STATE;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * The the RPC interface of the {@link Router} implemented by
+ * {@link RouterRpcServer}.
+ */
+public class TestRouterRpc {
+
+ /** Federated HDFS cluster. */
+ private static RouterDFSCluster cluster;
+
+ /** Random Router for this federated cluster. */
+ private RouterContext router;
+
+ /** Random nameservice in the federated cluster. */
+ private String ns;
+ /** First namenode in the nameservice. */
+ private NamenodeContext namenode;
+
+ /** Client interface to the Router. */
+ private ClientProtocol routerProtocol;
+ /** Client interface to the Namenode. */
+ private ClientProtocol nnProtocol;
+
+ /** Filesystem interface to the Router. */
+ private FileSystem routerFS;
+ /** Filesystem interface to the Namenode. */
+ private FileSystem nnFS;
+
+ /** File in the Router. */
+ private String routerFile;
+ /** File in the Namenode. */
+ private String nnFile;
+
+
+ @BeforeClass
+ public static void globalSetUp() throws Exception {
+ cluster = new RouterDFSCluster(false, 2);
+
+ // Start NNs and DNs and wait until ready
+ cluster.startCluster();
+
+ // Start routers with only an RPC service
+ cluster.addRouterOverrides((new RouterConfigBuilder()).rpc().build());
+ cluster.startRouters();
+
+ // Register and verify all NNs with all routers
+ cluster.registerNamenodes();
+ cluster.waitNamenodeRegistration();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ @Before
+ public void testSetup() throws Exception {
+
+ // Create mock locations
+ cluster.installMockLocations();
+
+ // Delete all files via the NNs and verify
+ cluster.deleteAllFiles();
+
+ // Create test fixtures on NN
+ cluster.createTestDirectoriesNamenode();
+
+ // Wait to ensure NN has fully created its test directories
+ Thread.sleep(100);
+
+ // Pick a NS, namenode and router for this test
+ this.router = cluster.getRandomRouter();
+ this.ns = cluster.getRandomNameservice();
+ this.namenode = cluster.getNamenode(ns, null);
+
+ // Handles to the ClientProtocol interface
+ this.routerProtocol = router.getClient().getNamenode();
+ this.nnProtocol = namenode.getClient().getNamenode();
+
+ // Handles to the filesystem client
+ this.nnFS = namenode.getFileSystem();
+ this.routerFS = router.getFileSystem();
+
+ // Create a test file on the NN
+ Random r = new Random();
+ String randomFile = "testfile-" + r.nextInt();
+ this.nnFile =
+ cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
+ this.routerFile =
+ cluster.getFederatedTestDirectoryForNS(ns) + "/" + randomFile;
+
+ createFile(nnFS, nnFile, 32);
+ verifyFileExists(nnFS, nnFile);
+ }
+
+ @Test
+ public void testRpcService() throws IOException {
+ Router testRouter = new Router();
+ List<String> nss = cluster.getNameservices();
+ String ns0 = nss.get(0);
+ Configuration routerConfig = cluster.generateRouterConfiguration(ns0, null);
+ RouterRpcServer server = new RouterRpcServer(routerConfig, testRouter,
+ testRouter.getNamenodeResolver(), testRouter.getSubclusterResolver());
+ server.init(routerConfig);
+ assertEquals(STATE.INITED, server.getServiceState());
+ server.start();
+ assertEquals(STATE.STARTED, server.getServiceState());
+ server.stop();
+ assertEquals(STATE.STOPPED, server.getServiceState());
+ server.close();
+ testRouter.close();
+ }
+
+ protected RouterDFSCluster getCluster() {
+ return TestRouterRpc.cluster;
+ }
+
+ protected RouterContext getRouterContext() {
+ return this.router;
+ }
+
+ protected void setRouter(RouterContext r)
+ throws IOException, URISyntaxException {
+ this.router = r;
+ this.routerProtocol = r.getClient().getNamenode();
+ this.routerFS = r.getFileSystem();
+ }
+
+ protected FileSystem getRouterFileSystem() {
+ return this.routerFS;
+ }
+
+ protected FileSystem getNamenodeFileSystem() {
+ return this.nnFS;
+ }
+
+ protected ClientProtocol getRouterProtocol() {
+ return this.routerProtocol;
+ }
+
+ protected ClientProtocol getNamenodeProtocol() {
+ return this.nnProtocol;
+ }
+
+ protected NamenodeContext getNamenode() {
+ return this.namenode;
+ }
+
+ protected void setNamenodeFile(String filename) {
+ this.nnFile = filename;
+ }
+
+ protected String getNamenodeFile() {
+ return this.nnFile;
+ }
+
+ protected void setRouterFile(String filename) {
+ this.routerFile = filename;
+ }
+
+ protected String getRouterFile() {
+ return this.routerFile;
+ }
+
+ protected void setNamenode(NamenodeContext nn)
+ throws IOException, URISyntaxException {
+ this.namenode = nn;
+ this.nnProtocol = nn.getClient().getNamenode();
+ this.nnFS = nn.getFileSystem();
+ }
+
+ protected String getNs() {
+ return this.ns;
+ }
+
+ protected void setNs(String nameservice) {
+ this.ns = nameservice;
+ }
+
+ protected static void compareResponses(
+ ClientProtocol protocol1, ClientProtocol protocol2,
+ Method m, Object[] paramList) {
+
+ Object return1 = null;
+ Exception exception1 = null;
+ try {
+ return1 = m.invoke(protocol1, paramList);
+ } catch (Exception ex) {
+ exception1 = ex;
+ }
+
+ Object return2 = null;
+ Exception exception2 = null;
+ try {
+ return2 = m.invoke(protocol2, paramList);
+ } catch (Exception ex) {
+ exception2 = ex;
+ }
+
+ assertEquals(return1, return2);
+ if (exception1 == null && exception2 == null) {
+ return;
+ }
+
+ assertEquals(
+ exception1.getCause().getClass(),
+ exception2.getCause().getClass());
+ }
+
+ @Test
+ public void testProxyListFiles() throws IOException, InterruptedException,
+ URISyntaxException, NoSuchMethodException, SecurityException {
+
+ // Verify that the root listing is a union of the mount table destinations
+ // and the files stored at all nameservices mounted at the root (ns0 + ns1)
+ //
+ // / -->
+ // /ns0 (from mount table)
+ // /ns1 (from mount table)
+ // all items in / of ns0 (default NS)
+
+ // Collect the mount table entries from the root mount point
+ Set<String> requiredPaths = new TreeSet<>();
+ FileSubclusterResolver fileResolver =
+ router.getRouter().getSubclusterResolver();
+ for (String mount : fileResolver.getMountPoints("/")) {
+ requiredPaths.add(mount);
+ }
+
+ // Collect all files/dirs on the root path of the default NS
+ String defaultNs = cluster.getNameservices().get(0);
+ NamenodeContext nn = cluster.getNamenode(defaultNs, null);
+ FileStatus[] iterator = nn.getFileSystem().listStatus(new Path("/"));
+ for (FileStatus file : iterator) {
+ requiredPaths.add(file.getPath().getName());
+ }
+
+ // Fetch listing
+ DirectoryListing listing =
+ routerProtocol.getListing("/", HdfsFileStatus.EMPTY_NAME, false);
+ Iterator<String> requiredPathsIterator = requiredPaths.iterator();
+ // Match each path returned and verify order returned
+ for(HdfsFileStatus f : listing.getPartialListing()) {
+ String fileName = requiredPathsIterator.next();
+ String currentFile = f.getFullPath(new Path("/")).getName();
+ assertEquals(currentFile, fileName);
+ }
+
+ // Verify the total number of results found/matched
+ assertEquals(requiredPaths.size(), listing.getPartialListing().length);
+
+ // List a path that doesn't exist and validate error response with NN
+ // behavior.
+ Method m = ClientProtocol.class.getMethod(
+ "getListing", String.class, byte[].class, boolean.class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(routerProtocol, nnProtocol, m,
+ new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
+ }
+
+ @Test
+ public void testProxyListFilesWithConflict()
+ throws IOException, InterruptedException {
+
+ // Add a directory to the namespace that conflicts with a mount point
+ NamenodeContext nn = cluster.getNamenode(ns, null);
+ FileSystem nnFs = nn.getFileSystem();
+ addDirectory(nnFs, cluster.getFederatedTestDirectoryForNS(ns));
+
+ FileSystem routerFs = router.getFileSystem();
+ int initialCount = countContents(routerFs, "/");
+
+ // Root file system now for NS X:
+ // / ->
+ // /ns0 (mount table)
+ // /ns1 (mount table)
+ // /target-ns0 (the target folder for the NS0 mapped to /
+ // /nsX (local directory that duplicates mount table)
+ int newCount = countContents(routerFs, "/");
+ assertEquals(initialCount, newCount);
+
+ // Verify that each root path is readable and contains one test directory
+ assertEquals(1, countContents(routerFs, cluster.getFederatedPathForNS(ns)));
+
+ // Verify that real folder for the ns contains a single test directory
+ assertEquals(1, countContents(nnFs, cluster.getNamenodePathForNS(ns)));
+
+ }
+
+ protected void testRename(RouterContext testRouter, String filename,
+ String renamedFile, boolean exceptionExpected) throws IOException {
+
+ createFile(testRouter.getFileSystem(), filename, 32);
+ // verify
+ verifyFileExists(testRouter.getFileSystem(), filename);
+ // rename
+ boolean exceptionThrown = false;
+ try {
+ DFSClient client = testRouter.getClient();
+ ClientProtocol clientProtocol = client.getNamenode();
+ clientProtocol.rename(filename, renamedFile);
+ } catch (Exception ex) {
+ exceptionThrown = true;
+ }
+ if (exceptionExpected) {
+ // Error was expected
+ assertTrue(exceptionThrown);
+ FileContext fileContext = testRouter.getFileContext();
+ assertTrue(fileContext.delete(new Path(filename), true));
+ } else {
+ // No error was expected
+ assertFalse(exceptionThrown);
+ // verify
+ assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile));
+ // delete
+ FileContext fileContext = testRouter.getFileContext();
+ assertTrue(fileContext.delete(new Path(renamedFile), true));
+ }
+ }
+
+ protected void testRename2(RouterContext testRouter, String filename,
+ String renamedFile, boolean exceptionExpected) throws IOException {
+ createFile(testRouter.getFileSystem(), filename, 32);
+ // verify
+ verifyFileExists(testRouter.getFileSystem(), filename);
+ // rename
+ boolean exceptionThrown = false;
+ try {
+ DFSClient client = testRouter.getClient();
+ ClientProtocol clientProtocol = client.getNamenode();
+ clientProtocol.rename2(filename, renamedFile, new Options.Rename[] {});
+ } catch (Exception ex) {
+ exceptionThrown = true;
+ }
+ assertEquals(exceptionExpected, exceptionThrown);
+ if (exceptionExpected) {
+ // Error was expected
+ FileContext fileContext = testRouter.getFileContext();
+ assertTrue(fileContext.delete(new Path(filename), true));
+ } else {
+ // verify
+ assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile));
+ // delete
+ FileContext fileContext = testRouter.getFileContext();
+ assertTrue(fileContext.delete(new Path(renamedFile), true));
+ }
+ }
+
+ @Test
+ public void testProxyRenameFiles() throws IOException, InterruptedException {
+
+ Thread.sleep(5000);
+ List<String> nss = cluster.getNameservices();
+ String ns0 = nss.get(0);
+ String ns1 = nss.get(1);
+
+ // Rename within the same namespace
+ // /ns0/testdir/testrename -> /ns0/testdir/testrename-append
+ String filename =
+ cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename";
+ String renamedFile = filename + "-append";
+ testRename(router, filename, renamedFile, false);
+ testRename2(router, filename, renamedFile, false);
+
+ // Rename a file to a destination that is in a different namespace (fails)
+ filename = cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename";
+ renamedFile = cluster.getFederatedTestDirectoryForNS(ns1) + "/testrename";
+ testRename(router, filename, renamedFile, true);
+ testRename2(router, filename, renamedFile, true);
+ }
+
+ @Test
+ public void testProxyChownFiles() throws Exception {
+
+ String newUsername = "TestUser";
+ String newGroup = "TestGroup";
+
+ // change owner
+ routerProtocol.setOwner(routerFile, newUsername, newGroup);
+
+ // Verify with NN
+ FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile);
+ assertEquals(file.getOwner(), newUsername);
+ assertEquals(file.getGroup(), newGroup);
+
+ // Bad request and validate router response matches NN response.
+ Method m = ClientProtocol.class.getMethod("setOwner", String.class,
+ String.class, String.class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(routerProtocol, nnProtocol, m,
+ new Object[] {badPath, newUsername, newGroup});
+ }
+
+ @Test
+ public void testProxyGetStats() throws Exception {
+
+ long[] combinedData = routerProtocol.getStats();
+
+ long[] individualData = new long[10];
+ for (String nameservice : cluster.getNameservices()) {
+ NamenodeContext n = cluster.getNamenode(nameservice, null);
+ DFSClient client = n.getClient();
+ ClientProtocol clientProtocol = client.getNamenode();
+ long[] data = clientProtocol.getStats();
+ for (int i = 0; i < data.length; i++) {
+ individualData[i] += data[i];
+ }
+ assert(data.length == combinedData.length);
+ }
+
+ for (int i = 0; i < combinedData.length && i < individualData.length; i++) {
+ if (i == ClientProtocol.GET_STATS_REMAINING_IDX) {
+ // Skip available storage as this fluctuates in mini cluster
+ continue;
+ }
+ assertEquals(combinedData[i], individualData[i]);
+ }
+ }
+
+ @Test
+ public void testProxyGetDatanodeReport() throws Exception {
+
+ DatanodeInfo[] combinedData =
+ routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
+
+ Set<Integer> individualData = new HashSet<Integer>();
+ for (String nameservice : cluster.getNameservices()) {
+ NamenodeContext n = cluster.getNamenode(nameservice, null);
+ DFSClient client = n.getClient();
+ ClientProtocol clientProtocol = client.getNamenode();
+ DatanodeInfo[] data =
+ clientProtocol.getDatanodeReport(DatanodeReportType.ALL);
+ for (int i = 0; i < data.length; i++) {
+ // Collect unique DNs based on their xfer port
+ DatanodeInfo info = data[i];
+ individualData.add(info.getXferPort());
+ }
+ }
+ assertEquals(combinedData.length, individualData.size());
+ }
+
+ @Test
+ public void testProxyGetDatanodeStorageReport()
+ throws IOException, InterruptedException, URISyntaxException {
+
+ DatanodeStorageReport[] combinedData =
+ routerProtocol.getDatanodeStorageReport(DatanodeReportType.ALL);
+
+ Set<String> individualData = new HashSet<>();
+ for (String nameservice : cluster.getNameservices()) {
+ NamenodeContext n = cluster.getNamenode(nameservice, null);
+ DFSClient client = n.getClient();
+ ClientProtocol clientProtocol = client.getNamenode();
+ DatanodeStorageReport[] data =
+ clientProtocol.getDatanodeStorageReport(DatanodeReportType.ALL);
+ for (DatanodeStorageReport report : data) {
+ // Determine unique DN instances
+ DatanodeInfo dn = report.getDatanodeInfo();
+ individualData.add(dn.toString());
+ }
+ }
+ assertEquals(combinedData.length, individualData.size());
+ }
+
+ @Test
+ public void testProxyMkdir() throws Exception {
+
+ // Check the initial folders
+ FileStatus[] filesInitial = routerFS.listStatus(new Path("/"));
+
+ // Create a directory via the router at the root level
+ String dirPath = "/testdir";
+ FsPermission permission = new FsPermission("705");
+ routerProtocol.mkdirs(dirPath, permission, false);
+
+ // Verify the root listing has the item via the router
+ FileStatus[] files = routerFS.listStatus(new Path("/"));
+ assertEquals(Arrays.toString(files) + " should be " +
+ Arrays.toString(filesInitial) + " + " + dirPath,
+ filesInitial.length + 1, files.length);
+ assertTrue(verifyFileExists(routerFS, dirPath));
+
+ // Verify the directory is present in only 1 Namenode
+ int foundCount = 0;
+ for (NamenodeContext n : cluster.getNamenodes()) {
+ if (verifyFileExists(n.getFileSystem(), dirPath)) {
+ foundCount++;
+ }
+ }
+ assertEquals(1, foundCount);
+ assertTrue(deleteFile(routerFS, dirPath));
+
+ // Validate router failure response matches NN failure response.
+ Method m = ClientProtocol.class.getMethod("mkdirs", String.class,
+ FsPermission.class, boolean.class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(routerProtocol, nnProtocol, m,
+ new Object[] {badPath, permission, false});
+ }
+
+ @Test
+ public void testProxyChmodFiles() throws Exception {
+
+ FsPermission permission = new FsPermission("444");
+
+ // change permissions
+ routerProtocol.setPermission(routerFile, permission);
+
+ // Validate permissions NN
+ FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile);
+ assertEquals(permission, file.getPermission());
+
+ // Validate router failure response matches NN failure response.
+ Method m = ClientProtocol.class.getMethod(
+ "setPermission", String.class, FsPermission.class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(routerProtocol, nnProtocol, m,
+ new Object[] {badPath, permission});
+ }
+
+ @Test
+ public void testProxySetReplication() throws Exception {
+
+ // Check current replication via NN
+ FileStatus file = getFileStatus(nnFS, nnFile);
+ assertEquals(1, file.getReplication());
+
+ // increment replication via router
+ routerProtocol.setReplication(routerFile, (short) 2);
+
+ // Verify via NN
+ file = getFileStatus(nnFS, nnFile);
+ assertEquals(2, file.getReplication());
+
+ // Validate router failure response matches NN failure response.
+ Method m = ClientProtocol.class.getMethod(
+ "setReplication", String.class, short.class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(routerProtocol, nnProtocol, m,
+ new Object[] {badPath, (short) 2});
+ }
+
+ @Test
+ public void testProxyTruncateFile() throws Exception {
+
+ // Check file size via NN
+ FileStatus file = getFileStatus(nnFS, nnFile);
+ assertTrue(file.getLen() > 0);
+
+ // Truncate to 0 bytes via router
+ routerProtocol.truncate(routerFile, 0, "testclient");
+
+ // Verify via NN
+ file = getFileStatus(nnFS, nnFile);
+ assertEquals(0, file.getLen());
+
+ // Validate router failure response matches NN failure response.
+ Method m = ClientProtocol.class.getMethod(
+ "truncate", String.class, long.class, String.class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(routerProtocol, nnProtocol, m,
+ new Object[] {badPath, (long) 0, "testclient"});
+ }
+
+ @Test
+ public void testProxyGetBlockLocations() throws Exception {
+
+ // Fetch block locations via router
+ LocatedBlocks locations =
+ routerProtocol.getBlockLocations(routerFile, 0, 1024);
+ assertEquals(1, locations.getLocatedBlocks().size());
+
+ // Validate router failure response matches NN failure response.
+ Method m = ClientProtocol.class.getMethod(
+ "getBlockLocations", String.class, long.class, long.class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(routerProtocol, nnProtocol,
+ m, new Object[] {badPath, (long) 0, (long) 0});
+ }
+
+ @Test
+ public void testProxyStoragePolicy() throws Exception {
+
+ // Query initial policy via NN
+ HdfsFileStatus status = namenode.getClient().getFileInfo(nnFile);
+
+ // Set a random policy via router
+ BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies();
+ BlockStoragePolicy policy = policies[0];
+
+ while (policy.isCopyOnCreateFile()) {
+ // Pick a non copy on create policy
+ Random rand = new Random();
+ int randIndex = rand.nextInt(policies.length);
+ policy = policies[randIndex];
+ }
+ routerProtocol.setStoragePolicy(routerFile, policy.getName());
+
+ // Verify policy via NN
+ HdfsFileStatus newStatus = namenode.getClient().getFileInfo(nnFile);
+ assertTrue(newStatus.getStoragePolicy() == policy.getId());
+ assertTrue(newStatus.getStoragePolicy() != status.getStoragePolicy());
+
+ // Validate router failure response matches NN failure response.
+ Method m = ClientProtocol.class.getMethod("setStoragePolicy", String.class,
+ String.class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(routerProtocol, nnProtocol,
+ m, new Object[] {badPath, "badpolicy"});
+ }
+
+ @Test
+ public void testProxyGetPreferedBlockSize() throws Exception {
+
+ // Query via NN and Router and verify
+ long namenodeSize = nnProtocol.getPreferredBlockSize(nnFile);
+ long routerSize = routerProtocol.getPreferredBlockSize(routerFile);
+ assertEquals(routerSize, namenodeSize);
+
+ // Validate router failure response matches NN failure response.
+ Method m = ClientProtocol.class.getMethod(
+ "getPreferredBlockSize", String.class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(
+ routerProtocol, nnProtocol, m, new Object[] {badPath});
+ }
+
+ private void testConcat(
+ String source, String target, boolean failureExpected) {
+ boolean failure = false;
+ try {
+ // Concat test file with fill block length file via router
+ routerProtocol.concat(target, new String[] {source});
+ } catch (IOException ex) {
+ failure = true;
+ }
+ assertEquals(failureExpected, failure);
+ }
+
+ @Test
+ public void testProxyConcatFile() throws Exception {
+
+ // Create a stub file in the primary ns
+ String sameNameservice = ns;
+ String existingFile =
+ cluster.getFederatedTestDirectoryForNS(sameNameservice) +
+ "_concatfile";
+ int existingFileSize = 32;
+ createFile(routerFS, existingFile, existingFileSize);
+
+ // Identify an alternate nameservice that doesn't match the existing file
+ String alternateNameservice = null;
+ for (String n : cluster.getNameservices()) {
+ if (!n.equals(sameNameservice)) {
+ alternateNameservice = n;
+ break;
+ }
+ }
+
+ // Create new files, must be a full block to use concat. One file is in the
+ // same namespace as the target file, the other is in a different namespace.
+ String altRouterFile =
+ cluster.getFederatedTestDirectoryForNS(alternateNameservice) +
+ "_newfile";
+ String sameRouterFile =
+ cluster.getFederatedTestDirectoryForNS(sameNameservice) +
+ "_newfile";
+ createFile(routerFS, altRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+ createFile(routerFS, sameRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+
+ // Concat in different namespaces, fails
+ testConcat(existingFile, altRouterFile, true);
+
+ // Concat in same namespaces, succeeds
+ testConcat(existingFile, sameRouterFile, false);
+
+ // Check target file length
+ FileStatus status = getFileStatus(routerFS, sameRouterFile);
+ assertEquals(
+ existingFileSize + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT,
+ status.getLen());
+
+ // Validate router failure response matches NN failure response.
+ Method m = ClientProtocol.class.getMethod(
+ "concat", String.class, String[].class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(routerProtocol, nnProtocol, m,
+ new Object[] {badPath, new String[] {routerFile}});
+ }
+
+ @Test
+ public void testProxyAppend() throws Exception {
+
+ // Append a test string via router
+ EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND);
+ DFSClient routerClient = getRouterContext().getClient();
+ HdfsDataOutputStream stream =
+ routerClient.append(routerFile, 1024, createFlag, null, null);
+ stream.writeBytes(TEST_STRING);
+ stream.close();
+
+ // Verify file size via NN
+ FileStatus status = getFileStatus(nnFS, nnFile);
+ assertTrue(status.getLen() > TEST_STRING.length());
+
+ // Validate router failure response matches NN failure response.
+ Method m = ClientProtocol.class.getMethod("append", String.class,
+ String.class, EnumSetWritable.class);
+ String badPath = "/unknownlocation/unknowndir";
+ EnumSetWritable<CreateFlag> createFlagWritable =
+ new EnumSetWritable<CreateFlag>(createFlag);
+ compareResponses(routerProtocol, nnProtocol, m,
+ new Object[] {badPath, "testClient", createFlagWritable});
+ }
+
+ @Test
+ public void testProxyGetAdditionalDatanode()
+ throws IOException, InterruptedException, URISyntaxException {
+
+ // Use primitive APIs to open a file, add a block, and get datanode location
+ EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
+ String clientName = getRouterContext().getClient().getClientName();
+ String newRouterFile = routerFile + "_additionalDatanode";
+ HdfsFileStatus status = routerProtocol.create(
+ newRouterFile, new FsPermission("777"), clientName,
+ new EnumSetWritable<CreateFlag>(createFlag), true, (short) 1,
+ (long) 1024, CryptoProtocolVersion.supported(), null);
+
+ // Add a block via router (requires client to have same lease)
+ LocatedBlock block = routerProtocol.addBlock(
+ newRouterFile, clientName, null, null,
+ status.getFileId(), null, null);
+
+ DatanodeInfo[] exclusions = new DatanodeInfo[0];
+ LocatedBlock newBlock = routerProtocol.getAdditionalDatanode(
+ newRouterFile, status.getFileId(), block.getBlock(),
+ block.getLocations(), block.getStorageIDs(), exclusions, 1, clientName);
+ assertNotNull(newBlock);
+ }
+
+ @Test
+ public void testProxyCreateFileAlternateUser()
+ throws IOException, URISyntaxException, InterruptedException {
+
+ // Create via Router
+ String routerDir = cluster.getFederatedTestDirectoryForNS(ns);
+ String namenodeDir = cluster.getNamenodeTestDirectoryForNS(ns);
+ String newRouterFile = routerDir + "/unknownuser";
+ String newNamenodeFile = namenodeDir + "/unknownuser";
+ String username = "unknownuser";
+
+ // Allow all user access to dir
+ namenode.getFileContext().setPermission(
+ new Path(namenodeDir), new FsPermission("777"));
+
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(username);
+ DFSClient client = getRouterContext().getClient(ugi);
+ client.create(newRouterFile, true);
+
+ // Fetch via NN and check user
+ FileStatus status = getFileStatus(nnFS, newNamenodeFile);
+ assertEquals(status.getOwner(), username);
+ }
+
+ @Test
+ public void testProxyGetFileInfoAcessException() throws IOException {
+
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser("unknownuser");
+
+ // List files from the NN and trap the exception
+ Exception nnFailure = null;
+ try {
+ String testFile = cluster.getNamenodeTestFileForNS(ns);
+ namenode.getClient(ugi).getLocatedBlocks(testFile, 0);
+ } catch (Exception e) {
+ nnFailure = e;
+ }
+ assertNotNull(nnFailure);
+
+ // List files from the router and trap the exception
+ Exception routerFailure = null;
+ try {
+ String testFile = cluster.getFederatedTestFileForNS(ns);
+ getRouterContext().getClient(ugi).getLocatedBlocks(testFile, 0);
+ } catch (Exception e) {
+ routerFailure = e;
+ }
+ assertNotNull(routerFailure);
+
+ assertEquals(routerFailure.getClass(), nnFailure.getClass());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
new file mode 100644
index 0000000..5489691
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+
+/**
+ * The the RPC interface of the {@link getRouter()} implemented by
+ * {@link RouterRpcServer}.
+ */
+public class TestRouterRpcMultiDestination extends TestRouterRpc {
+
+ @Override
+ public void testSetup() throws Exception {
+
+ RouterDFSCluster cluster = getCluster();
+
+ // Create mock locations
+ getCluster().installMockLocations();
+ List<RouterContext> routers = cluster.getRouters();
+
+ // Add extra location to the root mount / such that the root mount points:
+ // /
+ // ns0 -> /
+ // ns1 -> /
+ for (RouterContext rc : routers) {
+ Router router = rc.getRouter();
+ MockResolver resolver = (MockResolver) router.getSubclusterResolver();
+ resolver.addLocation("/", cluster.getNameservices().get(1), "/");
+ }
+
+ // Create a mount that points to 2 dirs in the same ns:
+ // /same
+ // ns0 -> /
+ // ns0 -> /target-ns0
+ for (RouterContext rc : routers) {
+ Router router = rc.getRouter();
+ MockResolver resolver = (MockResolver) router.getSubclusterResolver();
+ List<String> nss = cluster.getNameservices();
+ String ns0 = nss.get(0);
+ resolver.addLocation("/same", ns0, "/");
+ resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0));
+ }
+
+ // Delete all files via the NNs and verify
+ cluster.deleteAllFiles();
+
+ // Create test fixtures on NN
+ cluster.createTestDirectoriesNamenode();
+
+ // Wait to ensure NN has fully created its test directories
+ Thread.sleep(100);
+
+ // Pick a NS, namenode and getRouter() for this test
+ RouterContext router = cluster.getRandomRouter();
+ this.setRouter(router);
+
+ String ns = cluster.getRandomNameservice();
+ this.setNs(ns);
+ this.setNamenode(cluster.getNamenode(ns, null));
+
+ // Create a test file on a single NN that is accessed via a getRouter() path
+ // with 2 destinations. All tests should failover to the alternate
+ // destination if the wrong NN is attempted first.
+ Random r = new Random();
+ String randomString = "testfile-" + r.nextInt();
+ setNamenodeFile("/" + randomString);
+ setRouterFile("/" + randomString);
+
+ FileSystem nnFs = getNamenodeFileSystem();
+ FileSystem routerFs = getRouterFileSystem();
+ createFile(nnFs, getNamenodeFile(), 32);
+
+ verifyFileExists(nnFs, getNamenodeFile());
+ verifyFileExists(routerFs, getRouterFile());
+ }
+
+ private void testListing(String path) throws IOException {
+
+ // Collect the mount table entries for this path
+ Set<String> requiredPaths = new TreeSet<>();
+ RouterContext rc = getRouterContext();
+ Router router = rc.getRouter();
+ FileSubclusterResolver subclusterResolver = router.getSubclusterResolver();
+ for (String mount : subclusterResolver.getMountPoints(path)) {
+ requiredPaths.add(mount);
+ }
+
+ // Get files/dirs from the Namenodes
+ PathLocation location = subclusterResolver.getDestinationForPath(path);
+ for (RemoteLocation loc : location.getDestinations()) {
+ String nsId = loc.getNameserviceId();
+ String dest = loc.getDest();
+ NamenodeContext nn = getCluster().getNamenode(nsId, null);
+ FileSystem fs = nn.getFileSystem();
+ FileStatus[] files = fs.listStatus(new Path(dest));
+ for (FileStatus file : files) {
+ String pathName = file.getPath().getName();
+ requiredPaths.add(pathName);
+ }
+ }
+
+ // Get files/dirs from the Router
+ DirectoryListing listing =
+ getRouterProtocol().getListing(path, HdfsFileStatus.EMPTY_NAME, false);
+ Iterator<String> requiredPathsIterator = requiredPaths.iterator();
+
+ // Match each path returned and verify order returned
+ HdfsFileStatus[] partialListing = listing.getPartialListing();
+ for (HdfsFileStatus fileStatus : listing.getPartialListing()) {
+ String fileName = requiredPathsIterator.next();
+ String currentFile = fileStatus.getFullPath(new Path(path)).getName();
+ assertEquals(currentFile, fileName);
+ }
+
+ // Verify the total number of results found/matched
+ assertEquals(
+ requiredPaths + " doesn't match " + Arrays.toString(partialListing),
+ requiredPaths.size(), partialListing.length);
+ }
+
+ @Override
+ public void testProxyListFiles() throws IOException, InterruptedException,
+ URISyntaxException, NoSuchMethodException, SecurityException {
+
+ // Verify that the root listing is a union of the mount table destinations
+ // and the files stored at all nameservices mounted at the root (ns0 + ns1)
+ // / -->
+ // /ns0 (from mount table)
+ // /ns1 (from mount table)
+ // /same (from the mount table)
+ // all items in / of ns0 from mapping of / -> ns0:::/)
+ // all items in / of ns1 from mapping of / -> ns1:::/)
+ testListing("/");
+
+ // Verify that the "/same" mount point lists the contents of both dirs in
+ // the same ns
+ // /same -->
+ // /target-ns0 (from root of ns0)
+ // /testdir (from contents of /target-ns0)
+ testListing("/same");
+
+ // List a non-existing path and validate error response with NN behavior
+ ClientProtocol namenodeProtocol =
+ getCluster().getRandomNamenode().getClient().getNamenode();
+ Method m = ClientProtocol.class.getMethod(
+ "getListing", String.class, byte[].class, boolean.class);
+ String badPath = "/unknownlocation/unknowndir";
+ compareResponses(getRouterProtocol(), namenodeProtocol, m,
+ new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
+ }
+
+ @Override
+ public void testProxyRenameFiles() throws IOException, InterruptedException {
+
+ super.testProxyRenameFiles();
+
+ List<String> nss = getCluster().getNameservices();
+ String ns0 = nss.get(0);
+ String ns1 = nss.get(1);
+
+ // Rename a file from ns0 into the root (mapped to both ns0 and ns1)
+ String testDir0 = getCluster().getFederatedTestDirectoryForNS(ns0);
+ String filename0 = testDir0 + "/testrename";
+ String renamedFile = "/testrename";
+ testRename(getRouterContext(), filename0, renamedFile, false);
+ testRename2(getRouterContext(), filename0, renamedFile, false);
+
+ // Rename a file from ns1 into the root (mapped to both ns0 and ns1)
+ String testDir1 = getCluster().getFederatedTestDirectoryForNS(ns1);
+ String filename1 = testDir1 + "/testrename";
+ testRename(getRouterContext(), filename1, renamedFile, false);
+ testRename2(getRouterContext(), filename1, renamedFile, false);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org