You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/05/02 22:06:04 UTC
[47/50] [abbrv] hadoop git commit: HDFS-10629. Federation Roter.
Contributed by Jason Kace and Inigo Goiri.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63b74541/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
new file mode 100644
index 0000000..ee6f57d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodePriorityComparator;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.util.Time;
+
+/**
+ * In-memory cache/mock of a namenode and file resolver. Stores the most
+ * recently updated NN information for each nameservice and block pool. Also
+ * stores a virtual mount table for resolving global namespace paths to local NN
+ * paths.
+ */
+public class MockResolver
+ implements ActiveNamenodeResolver, FileSubclusterResolver {
+
+ private Map<String, List<? extends FederationNamenodeContext>> resolver =
+ new HashMap<String, List<? extends FederationNamenodeContext>>();
+ private Map<String, List<RemoteLocation>> locations =
+ new HashMap<String, List<RemoteLocation>>();
+ private Set<FederationNamespaceInfo> namespaces =
+ new HashSet<FederationNamespaceInfo>();
+ private String defaultNamespace = null;
+
+ public MockResolver(Configuration conf, StateStoreService store) {
+ this.cleanRegistrations();
+ }
+
+ public void addLocation(String mount, String nameservice, String location) {
+ RemoteLocation remoteLocation = new RemoteLocation(nameservice, location);
+ List<RemoteLocation> locationsList = locations.get(mount);
+ if (locationsList == null) {
+ locationsList = new LinkedList<RemoteLocation>();
+ locations.put(mount, locationsList);
+ }
+ if (!locationsList.contains(remoteLocation)) {
+ locationsList.add(remoteLocation);
+ }
+
+ if (this.defaultNamespace == null) {
+ this.defaultNamespace = nameservice;
+ }
+ }
+
+ public synchronized void cleanRegistrations() {
+ this.resolver =
+ new HashMap<String, List<? extends FederationNamenodeContext>>();
+ this.namespaces = new HashSet<FederationNamespaceInfo>();
+ }
+
+ @Override
+ public void updateActiveNamenode(
+ String ns, InetSocketAddress successfulAddress) {
+
+ String address = successfulAddress.getHostName() + ":" +
+ successfulAddress.getPort();
+ String key = ns;
+ if (key != null) {
+ // Update the active entry
+ @SuppressWarnings("unchecked")
+ List<FederationNamenodeContext> iterator =
+ (List<FederationNamenodeContext>) resolver.get(key);
+ for (FederationNamenodeContext namenode : iterator) {
+ if (namenode.getRpcAddress().equals(address)) {
+ MockNamenodeContext nn = (MockNamenodeContext) namenode;
+ nn.setState(FederationNamenodeServiceState.ACTIVE);
+ break;
+ }
+ }
+ Collections.sort(iterator, new NamenodePriorityComparator());
+ }
+ }
+
+ @Override
+ public List<? extends FederationNamenodeContext>
+ getNamenodesForNameserviceId(String nameserviceId) {
+ return resolver.get(nameserviceId);
+ }
+
+ @Override
+ public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
+ String blockPoolId) {
+ return resolver.get(blockPoolId);
+ }
+
+ private static class MockNamenodeContext
+ implements FederationNamenodeContext {
+ private String webAddress;
+ private String rpcAddress;
+ private String serviceAddress;
+ private String lifelineAddress;
+ private String namenodeId;
+ private String nameserviceId;
+ private FederationNamenodeServiceState state;
+ private long dateModified;
+
+ MockNamenodeContext(
+ String rpc, String service, String lifeline, String web,
+ String ns, String nn, FederationNamenodeServiceState state) {
+ this.rpcAddress = rpc;
+ this.serviceAddress = service;
+ this.lifelineAddress = lifeline;
+ this.webAddress = web;
+ this.namenodeId = nn;
+ this.nameserviceId = ns;
+ this.state = state;
+ this.dateModified = Time.now();
+ }
+
+ public void setState(FederationNamenodeServiceState newState) {
+ this.state = newState;
+ this.dateModified = Time.now();
+ }
+
+ @Override
+ public String getRpcAddress() {
+ return rpcAddress;
+ }
+
+ @Override
+ public String getServiceAddress() {
+ return serviceAddress;
+ }
+
+ @Override
+ public String getLifelineAddress() {
+ return lifelineAddress;
+ }
+
+ @Override
+ public String getWebAddress() {
+ return webAddress;
+ }
+
+ @Override
+ public String getNamenodeKey() {
+ return nameserviceId + " " + namenodeId + " " + rpcAddress;
+ }
+
+ @Override
+ public String getNameserviceId() {
+ return nameserviceId;
+ }
+
+ @Override
+ public String getNamenodeId() {
+ return namenodeId;
+ }
+
+ @Override
+ public FederationNamenodeServiceState getState() {
+ return state;
+ }
+
+ @Override
+ public long getDateModified() {
+ return dateModified;
+ }
+ }
+
+ @Override
+ public synchronized boolean registerNamenode(NamenodeStatusReport report)
+ throws IOException {
+ MockNamenodeContext context = new MockNamenodeContext(
+ report.getRpcAddress(), report.getServiceAddress(),
+ report.getLifelineAddress(), report.getWebAddress(),
+ report.getNameserviceId(), report.getNamenodeId(), report.getState());
+
+ String nsId = report.getNameserviceId();
+ String bpId = report.getBlockPoolId();
+ String cId = report.getClusterId();
+ @SuppressWarnings("unchecked")
+ List<MockNamenodeContext> existingItems =
+ (List<MockNamenodeContext>) resolver.get(nsId);
+ if (existingItems == null) {
+ existingItems = new ArrayList<MockNamenodeContext>();
+ resolver.put(bpId, existingItems);
+ resolver.put(nsId, existingItems);
+ }
+ boolean added = false;
+ for (int i=0; i<existingItems.size() && !added; i++) {
+ MockNamenodeContext existing = existingItems.get(i);
+ if (existing.getNamenodeKey().equals(context.getNamenodeKey())) {
+ existingItems.set(i, context);
+ added = true;
+ }
+ }
+ if (!added) {
+ existingItems.add(context);
+ }
+ Collections.sort(existingItems, new NamenodePriorityComparator());
+
+ FederationNamespaceInfo info = new FederationNamespaceInfo(bpId, cId, nsId);
+ namespaces.add(info);
+ return true;
+ }
+
+ @Override
+ public Set<FederationNamespaceInfo> getNamespaces() throws IOException {
+ return this.namespaces;
+ }
+
+ @Override
+ public PathLocation getDestinationForPath(String path) throws IOException {
+ String finalPath = null;
+ String nameservice = null;
+ Set<String> namespaceSet = new HashSet<String>();
+ LinkedList<RemoteLocation> remoteLocations =
+ new LinkedList<RemoteLocation>();
+ for(String key : this.locations.keySet()) {
+ if(path.startsWith(key)) {
+ for (RemoteLocation location : this.locations.get(key)) {
+ finalPath = location.getDest() + path.substring(key.length());
+ nameservice = location.getNameserviceId();
+ RemoteLocation remoteLocation =
+ new RemoteLocation(nameservice, finalPath);
+ remoteLocations.add(remoteLocation);
+ namespaceSet.add(nameservice);
+ }
+ break;
+ }
+ }
+ if (remoteLocations.isEmpty()) {
+ // Path isn't supported, mimic resolver behavior.
+ return null;
+ }
+ return new PathLocation(path, remoteLocations, namespaceSet);
+ }
+
+ @Override
+ public List<String> getMountPoints(String path) throws IOException {
+ List<String> mounts = new ArrayList<String>();
+ if (path.equals("/")) {
+ // Mounts only supported under root level
+ for (String mount : this.locations.keySet()) {
+ if (mount.length() > 1) {
+ // Remove leading slash, this is the behavior of the mount tree,
+ // return only names.
+ mounts.add(mount.replace("/", ""));
+ }
+ }
+ }
+ return mounts;
+ }
+
+ @Override
+ public void setRouterId(String router) {
+ }
+
+ @Override
+ public String getDefaultNamespace() {
+ return defaultNamespace;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63b74541/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
new file mode 100644
index 0000000..16d624c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Constructs a router configuration with individual features enabled/disabled.
+ */
+public class RouterConfigBuilder {
+
+ private Configuration conf;
+
+ public RouterConfigBuilder(Configuration configuration) {
+ this.conf = configuration;
+ }
+
+ public RouterConfigBuilder() {
+ this.conf = new Configuration();
+ }
+
+ public Configuration build() {
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63b74541/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
new file mode 100644
index 0000000..55d04ad
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
@@ -0,0 +1,767 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service.STATE;
+
+/**
+ * Test utility to mimic a federated HDFS cluster with a router.
+ */
+public class RouterDFSCluster {
+ /**
+ * Router context.
+ */
+ public class RouterContext {
+ private Router router;
+ private FileContext fileContext;
+ private String nameserviceId;
+ private String namenodeId;
+ private int rpcPort;
+ private DFSClient client;
+ private Configuration conf;
+ private URI fileSystemUri;
+
+ public RouterContext(Configuration conf, String ns, String nn)
+ throws URISyntaxException {
+ this.namenodeId = nn;
+ this.nameserviceId = ns;
+ this.conf = conf;
+ router = new Router();
+ router.init(conf);
+ }
+
+ public Router getRouter() {
+ return this.router;
+ }
+
+ public String getNameserviceId() {
+ return this.nameserviceId;
+ }
+
+ public String getNamenodeId() {
+ return this.namenodeId;
+ }
+
+ public int getRpcPort() {
+ return this.rpcPort;
+ }
+
+ public FileContext getFileContext() {
+ return this.fileContext;
+ }
+
+ public void initRouter() throws URISyntaxException {
+ }
+
+ public DistributedFileSystem getFileSystem() throws IOException {
+ DistributedFileSystem fs =
+ (DistributedFileSystem) DistributedFileSystem.get(conf);
+ return fs;
+ }
+
+ public DFSClient getClient(UserGroupInformation user)
+ throws IOException, URISyntaxException, InterruptedException {
+
+ LOG.info("Connecting to router at " + fileSystemUri);
+ return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
+ @Override
+ public DFSClient run() throws IOException {
+ return new DFSClient(fileSystemUri, conf);
+ }
+ });
+ }
+
+ public DFSClient getClient() throws IOException, URISyntaxException {
+
+ if (client == null) {
+ LOG.info("Connecting to router at " + fileSystemUri);
+ client = new DFSClient(fileSystemUri, conf);
+ }
+ return client;
+ }
+ }
+
+ /**
+ * Namenode context.
+ */
+ public class NamenodeContext {
+ private NameNode namenode;
+ private String nameserviceId;
+ private String namenodeId;
+ private FileContext fileContext;
+ private int rpcPort;
+ private int servicePort;
+ private int lifelinePort;
+ private int httpPort;
+ private URI fileSystemUri;
+ private int index;
+ private Configuration conf;
+ private DFSClient client;
+
+ public NamenodeContext(Configuration conf, String ns, String nn,
+ int index) {
+ this.conf = conf;
+ this.namenodeId = nn;
+ this.nameserviceId = ns;
+ this.index = index;
+ }
+
+ public NameNode getNamenode() {
+ return this.namenode;
+ }
+
+ public String getNameserviceId() {
+ return this.nameserviceId;
+ }
+
+ public String getNamenodeId() {
+ return this.namenodeId;
+ }
+
+ public FileContext getFileContext() {
+ return this.fileContext;
+ }
+
+ public void setNamenode(NameNode n) throws URISyntaxException {
+ namenode = n;
+
+ // Store the bound ports and override the default FS with the local NN's
+ // RPC
+ rpcPort = n.getNameNodeAddress().getPort();
+ servicePort = n.getServiceRpcAddress().getPort();
+ lifelinePort = n.getServiceRpcAddress().getPort();
+ httpPort = n.getHttpAddress().getPort();
+ fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort());
+ DistributedFileSystem.setDefaultUri(conf, fileSystemUri);
+
+ try {
+ this.fileContext = FileContext.getFileContext(conf);
+ } catch (UnsupportedFileSystemException e) {
+ this.fileContext = null;
+ }
+ }
+
+ public String getRpcAddress() {
+ return namenode.getNameNodeAddress().getHostName() + ":" + rpcPort;
+ }
+
+ public String getServiceAddress() {
+ return namenode.getServiceRpcAddress().getHostName() + ":" + servicePort;
+ }
+
+ public String getLifelineAddress() {
+ return namenode.getServiceRpcAddress().getHostName() + ":" + lifelinePort;
+ }
+
+ public String getHttpAddress() {
+ return namenode.getHttpAddress().getHostName() + ":" + httpPort;
+ }
+
+ public DistributedFileSystem getFileSystem() throws IOException {
+ DistributedFileSystem fs =
+ (DistributedFileSystem) DistributedFileSystem.get(conf);
+ return fs;
+ }
+
+ public void resetClient() {
+ client = null;
+ }
+
+ public DFSClient getClient(UserGroupInformation user)
+ throws IOException, URISyntaxException, InterruptedException {
+
+ LOG.info("Connecting to namenode at " + fileSystemUri);
+ return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
+ @Override
+ public DFSClient run() throws IOException {
+ return new DFSClient(fileSystemUri, conf);
+ }
+ });
+ }
+
+ public DFSClient getClient() throws IOException, URISyntaxException {
+ if (client == null) {
+ LOG.info("Connecting to namenode at " + fileSystemUri);
+ client = new DFSClient(fileSystemUri, conf);
+ }
+ return client;
+ }
+
+ public String getConfSuffix() {
+ String suffix = nameserviceId;
+ if (highAvailability) {
+ suffix += "." + namenodeId;
+ }
+ return suffix;
+ }
+ }
+
+ public static final String NAMENODE1 = "nn0";
+ public static final String NAMENODE2 = "nn1";
+ public static final String NAMENODE3 = "nn2";
+ public static final String TEST_STRING = "teststring";
+ public static final String TEST_DIR = "testdir";
+ public static final String TEST_FILE = "testfile";
+
+ private List<String> nameservices;
+ private List<RouterContext> routers;
+ private List<NamenodeContext> namenodes;
+ private static final Log LOG = LogFactory.getLog(RouterDFSCluster.class);
+ private MiniDFSCluster cluster;
+ private boolean highAvailability;
+
+ protected static final int DEFAULT_HEARTBEAT_INTERVAL = 5;
+ protected static final int DEFAULT_CACHE_INTERVAL_SEC = 5;
+ private Configuration routerOverrides;
+ private Configuration namenodeOverrides;
+
+ private static final String NAMENODES = NAMENODE1 + "," + NAMENODE2;
+
+ public RouterDFSCluster(boolean ha, int numNameservices) {
+ this(ha, numNameservices, 2);
+ }
+
+ public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) {
+ this.highAvailability = ha;
+ configureNameservices(numNameservices, numNamenodes);
+ }
+
+ public void addRouterOverrides(Configuration conf) {
+ if (this.routerOverrides == null) {
+ this.routerOverrides = conf;
+ } else {
+ this.routerOverrides.addResource(conf);
+ }
+ }
+
+ public void addNamenodeOverrides(Configuration conf) {
+ if (this.namenodeOverrides == null) {
+ this.namenodeOverrides = conf;
+ } else {
+ this.namenodeOverrides.addResource(conf);
+ }
+ }
+
+ public Configuration generateNamenodeConfiguration(
+ String defaultNameserviceId) {
+ Configuration c = new HdfsConfiguration();
+
+ c.set(DFSConfigKeys.DFS_NAMESERVICES, getNameservicesKey());
+ c.set("fs.defaultFS", "hdfs://" + defaultNameserviceId);
+
+ for (String ns : nameservices) {
+ if (highAvailability) {
+ c.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, NAMENODES);
+ }
+
+ for (NamenodeContext context : getNamenodes(ns)) {
+ String suffix = context.getConfSuffix();
+
+ c.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
+ "127.0.0.1:" + context.rpcPort);
+ c.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
+ "127.0.0.1:" + context.httpPort);
+ c.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
+ "0.0.0.0");
+ }
+ }
+
+ if (namenodeOverrides != null) {
+ c.addResource(namenodeOverrides);
+ }
+ return c;
+ }
+
+ public Configuration generateClientConfiguration() {
+ Configuration conf = new HdfsConfiguration();
+ conf.addResource(generateNamenodeConfiguration(getNameservices().get(0)));
+ return conf;
+ }
+
+ public Configuration generateRouterConfiguration(String localNameserviceId,
+ String localNamenodeId) throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ conf.addResource(generateNamenodeConfiguration(localNameserviceId));
+
+ // Use mock resolver classes
+ conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+ MockResolver.class.getCanonicalName());
+ conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+ MockResolver.class.getCanonicalName());
+
+ // Set the nameservice ID for the default NN monitor
+ conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, localNameserviceId);
+
+ if (localNamenodeId != null) {
+ conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, localNamenodeId);
+ }
+
+ StringBuilder routerBuilder = new StringBuilder();
+ for (String ns : nameservices) {
+ for (NamenodeContext context : getNamenodes(ns)) {
+ String suffix = context.getConfSuffix();
+
+ if (routerBuilder.length() != 0) {
+ routerBuilder.append(",");
+ }
+ routerBuilder.append(suffix);
+ }
+ }
+
+ return conf;
+ }
+
+ public void configureNameservices(int numNameservices, int numNamenodes) {
+ nameservices = new ArrayList<String>();
+ for (int i = 0; i < numNameservices; i++) {
+ nameservices.add("ns" + i);
+ }
+ namenodes = new ArrayList<NamenodeContext>();
+ int index = 0;
+ for (String ns : nameservices) {
+ Configuration nnConf = generateNamenodeConfiguration(ns);
+ if (highAvailability) {
+ NamenodeContext context =
+ new NamenodeContext(nnConf, ns, NAMENODE1, index);
+ namenodes.add(context);
+ index++;
+
+ if (numNamenodes > 1) {
+ context = new NamenodeContext(nnConf, ns, NAMENODE2, index + 1);
+ namenodes.add(context);
+ index++;
+ }
+
+ if (numNamenodes > 2) {
+ context = new NamenodeContext(nnConf, ns, NAMENODE3, index + 1);
+ namenodes.add(context);
+ index++;
+ }
+
+ } else {
+ NamenodeContext context = new NamenodeContext(nnConf, ns, null, index);
+ namenodes.add(context);
+ index++;
+ }
+ }
+ }
+
+ public String getNameservicesKey() {
+ StringBuilder ns = new StringBuilder();
+ for (int i = 0; i < nameservices.size(); i++) {
+ if (i > 0) {
+ ns.append(",");
+ }
+ ns.append(nameservices.get(i));
+ }
+ return ns.toString();
+ }
+
+ public String getRandomNameservice() {
+ Random r = new Random();
+ return nameservices.get(r.nextInt(nameservices.size()));
+ }
+
+ public List<String> getNameservices() {
+ return nameservices;
+ }
+
+ public List<NamenodeContext> getNamenodes(String nameservice) {
+ ArrayList<NamenodeContext> nns = new ArrayList<NamenodeContext>();
+ for (NamenodeContext c : namenodes) {
+ if (c.nameserviceId.equals(nameservice)) {
+ nns.add(c);
+ }
+ }
+ return nns;
+ }
+
+ public NamenodeContext getRandomNamenode() {
+ Random rand = new Random();
+ return namenodes.get(rand.nextInt(namenodes.size()));
+ }
+
+ public List<NamenodeContext> getNamenodes() {
+ return namenodes;
+ }
+
+ public boolean isHighAvailability() {
+ return highAvailability;
+ }
+
+ public NamenodeContext getNamenode(String nameservice,
+ String namenode) {
+ for (NamenodeContext c : namenodes) {
+ if (c.nameserviceId.equals(nameservice)) {
+ if (namenode == null || c.namenodeId == null || namenode.isEmpty()
+ || c.namenodeId.isEmpty()) {
+ return c;
+ } else if (c.namenodeId.equals(namenode)) {
+ return c;
+ }
+ }
+ }
+ return null;
+ }
+
+ public List<RouterContext> getRouters(String nameservice) {
+ ArrayList<RouterContext> nns = new ArrayList<RouterContext>();
+ for (RouterContext c : routers) {
+ if (c.nameserviceId.equals(nameservice)) {
+ nns.add(c);
+ }
+ }
+ return nns;
+ }
+
+ public RouterContext getRouterContext(String nameservice,
+ String namenode) {
+ for (RouterContext c : routers) {
+ if (namenode == null) {
+ return c;
+ }
+ if (c.namenodeId.equals(namenode)
+ && c.nameserviceId.equals(nameservice)) {
+ return c;
+ }
+ }
+ return null;
+ }
+
+ public RouterContext getRandomRouter() {
+ Random rand = new Random();
+ return routers.get(rand.nextInt(routers.size()));
+ }
+
+ public List<RouterContext> getRouters() {
+ return routers;
+ }
+
+ public RouterContext buildRouter(String nameservice, String namenode)
+ throws URISyntaxException, IOException {
+ Configuration config = generateRouterConfiguration(nameservice, namenode);
+ RouterContext rc = new RouterContext(config, nameservice, namenode);
+ return rc;
+ }
+
+ public void startCluster() {
+ startCluster(null);
+ }
+
+ public void startCluster(Configuration overrideConf) {
+ try {
+ MiniDFSNNTopology topology = new MiniDFSNNTopology();
+ for (String ns : nameservices) {
+
+ NSConf conf = new MiniDFSNNTopology.NSConf(ns);
+ if (highAvailability) {
+ for(int i = 0; i < namenodes.size()/nameservices.size(); i++) {
+ NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i);
+ conf.addNN(nnConf);
+ }
+ } else {
+ NNConf nnConf = new MiniDFSNNTopology.NNConf(null);
+ conf.addNN(nnConf);
+ }
+ topology.addNameservice(conf);
+ }
+ topology.setFederation(true);
+
+ // Start mini DFS cluster
+ Configuration nnConf = generateNamenodeConfiguration(nameservices.get(0));
+ if (overrideConf != null) {
+ nnConf.addResource(overrideConf);
+ }
+ cluster = new MiniDFSCluster.Builder(nnConf).nnTopology(topology).build();
+ cluster.waitActive();
+
+ // Store NN pointers
+ for (int i = 0; i < namenodes.size(); i++) {
+ NameNode nn = cluster.getNameNode(i);
+ namenodes.get(i).setNamenode(nn);
+ }
+
+ } catch (Exception e) {
+ LOG.error("Cannot start Router DFS cluster: " + e.getMessage(), e);
+ cluster.shutdown();
+ }
+ }
+
+ public void startRouters()
+ throws InterruptedException, URISyntaxException, IOException {
+ // Create routers
+ routers = new ArrayList<RouterContext>();
+ for (String ns : nameservices) {
+
+ for (NamenodeContext context : getNamenodes(ns)) {
+ routers.add(buildRouter(ns, context.namenodeId));
+ }
+ }
+
+ // Start all routers
+ for (RouterContext router : routers) {
+ router.router.start();
+ }
+ // Wait until all routers are active and record their ports
+ for (RouterContext router : routers) {
+ waitActive(router);
+ router.initRouter();
+ }
+ }
+
+ public void waitActive(NamenodeContext nn) throws IOException {
+ cluster.waitActive(nn.index);
+ }
+
+ public void waitActive(RouterContext router)
+ throws InterruptedException {
+ for (int loopCount = 0; loopCount < 20; loopCount++) {
+ // Validate connection of routers to NNs
+ if (router.router.getServiceState() == STATE.STARTED) {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ assertFalse(
+ "Timeout waiting for " + router.router.toString() + " to activate.",
+ true);
+ }
+
+
+ public void registerNamenodes() throws IOException {
+ for (RouterContext r : routers) {
+ ActiveNamenodeResolver resolver = r.router.getNamenodeResolver();
+ for (NamenodeContext nn : namenodes) {
+ // Generate a report
+ NamenodeStatusReport report = new NamenodeStatusReport(nn.nameserviceId,
+ nn.namenodeId, nn.getRpcAddress(), nn.getServiceAddress(),
+ nn.getLifelineAddress(), nn.getHttpAddress());
+ report.setNamespaceInfo(nn.namenode.getNamesystem().getFSImage()
+ .getStorage().getNamespaceInfo());
+
+ // Determine HA state from nn public state string
+ String nnState = nn.namenode.getState();
+ HAServiceState haState = HAServiceState.ACTIVE;
+ for (HAServiceState state : HAServiceState.values()) {
+ if (nnState.equalsIgnoreCase(state.name())) {
+ haState = state;
+ break;
+ }
+ }
+ report.setHAServiceState(haState);
+
+ // Register with the resolver
+ resolver.registerNamenode(report);
+ }
+ }
+ }
+
+ public void waitNamenodeRegistration()
+ throws InterruptedException, IllegalStateException, IOException {
+ for (RouterContext r : routers) {
+ for (NamenodeContext nn : namenodes) {
+ FederationTestUtils.waitNamenodeRegistered(
+ r.router.getNamenodeResolver(), nn.nameserviceId, nn.namenodeId,
+ null);
+ }
+ }
+ }
+
+ public void waitRouterRegistrationQuorum(RouterContext router,
+ FederationNamenodeServiceState state, String nameservice, String namenode)
+ throws InterruptedException, IOException {
+ LOG.info("Waiting for NN - " + nameservice + ":" + namenode
+ + " to transition to state - " + state);
+ FederationTestUtils.waitNamenodeRegistered(
+ router.router.getNamenodeResolver(), nameservice, namenode, state);
+ }
+
+ public String getFederatedPathForNameservice(String ns) {
+ return "/" + ns;
+ }
+
+ public String getNamenodePathForNameservice(String ns) {
+ return "/target-" + ns;
+ }
+
+ /**
+ * @return example:
+ * <ul>
+ * <li>/ns0/testdir which maps to ns0->/target-ns0/testdir
+ * </ul>
+ */
+ public String getFederatedTestDirectoryForNameservice(String ns) {
+ return getFederatedPathForNameservice(ns) + "/" + TEST_DIR;
+ }
+
+ /**
+ * @return example:
+ * <ul>
+ * <li>/target-ns0/testdir
+ * </ul>
+ */
+ public String getNamenodeTestDirectoryForNameservice(String ns) {
+ return getNamenodePathForNameservice(ns) + "/" + TEST_DIR;
+ }
+
+ /**
+ * @return example:
+ * <ul>
+ * <li>/ns0/testfile which maps to ns0->/target-ns0/testfile
+ * </ul>
+ */
+ public String getFederatedTestFileForNameservice(String ns) {
+ return getFederatedPathForNameservice(ns) + "/" + TEST_FILE;
+ }
+
+ /**
+ * @return example:
+ * <ul>
+ * <li>/target-ns0/testfile
+ * </ul>
+ */
+ public String getNamenodeTestFileForNameservice(String ns) {
+ return getNamenodePathForNameservice(ns) + "/" + TEST_FILE;
+ }
+
+ public void shutdown() {
+ cluster.shutdown();
+ if (routers != null) {
+ for (RouterContext context : routers) {
+ stopRouter(context);
+ }
+ }
+ }
+
+ public void stopRouter(RouterContext router) {
+ try {
+
+ router.router.shutDown();
+
+ int loopCount = 0;
+ while (router.router.getServiceState() != STATE.STOPPED) {
+ loopCount++;
+ Thread.sleep(1000);
+ if (loopCount > 20) {
+ LOG.error("Unable to shutdown router - " + router.rpcPort);
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // Namespace Test Fixtures
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Creates test directories via the namenode.
+ * 1) /target-ns0/testfile
+ * 2) /target-ns1/testfile
+ * @throws IOException
+ */
+ public void createTestDirectoriesNamenode() throws IOException {
+ // Add a test dir to each NS and verify
+ for (String ns : getNameservices()) {
+ NamenodeContext context = getNamenode(ns, null);
+ if (!createTestDirectoriesNamenode(context)) {
+ throw new IOException("Unable to create test directory for ns - " + ns);
+ }
+ }
+ }
+
+ public boolean createTestDirectoriesNamenode(NamenodeContext nn)
+ throws IOException {
+ return FederationTestUtils.addDirectory(nn.getFileSystem(),
+ getNamenodeTestDirectoryForNameservice(nn.nameserviceId));
+ }
+
+ public void deleteAllFiles() throws IOException {
+ // Delete all files via the NNs and verify
+ for (NamenodeContext context : getNamenodes()) {
+ FileStatus[] status = context.getFileSystem().listStatus(new Path("/"));
+ for(int i = 0; i <status.length; i++) {
+ Path p = status[i].getPath();
+ context.getFileSystem().delete(p, true);
+ }
+ status = context.getFileSystem().listStatus(new Path("/"));
+ assertEquals(status.length, 0);
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // MockRouterResolver Test Fixtures
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * <ul>
+ * <li>/ -> [ns0->/].
+ * <li>/nso -> ns0->/target-ns0.
+ * <li>/ns1 -> ns1->/target-ns1.
+ * </ul>
+ */
+ public void installMockLocations() {
+ for (RouterContext r : routers) {
+ MockResolver resolver =
+ (MockResolver) r.router.getSubclusterResolver();
+ // create table entries
+ for (String ns : nameservices) {
+ // Direct path
+ resolver.addLocation(getFederatedPathForNameservice(ns), ns,
+ getNamenodePathForNameservice(ns));
+ }
+
+ // Root path goes to both NS1
+ resolver.addLocation("/", nameservices.get(0), "/");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63b74541/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
new file mode 100644
index 0000000..8c720c7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.service.Service.STATE;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * The the safe mode for the {@link Router} controlled by
+ * {@link SafeModeTimer}.
+ */
+public class TestRouter {
+
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void create() throws IOException {
+ // Basic configuration without the state store
+ conf = new Configuration();
+ // Mock resolver classes
+ conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+ MockResolver.class.getCanonicalName());
+ conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+ MockResolver.class.getCanonicalName());
+
+ // Simulate a co-located NN
+ conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0");
+ conf.set("fs.defaultFS", "hdfs://" + "ns0");
+ conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + "ns0",
+ "127.0.0.1:0" + 0);
+ conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + "ns0",
+ "127.0.0.1:" + 0);
+ conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + "ns0",
+ "0.0.0.0");
+ }
+
+ @AfterClass
+ public static void destroy() {
+ }
+
+ @Before
+ public void setup() throws IOException, URISyntaxException {
+ }
+
+ @After
+ public void cleanup() {
+ }
+
+ private static void testRouterStartup(Configuration routerConfig)
+ throws InterruptedException, IOException {
+ Router router = new Router();
+ assertEquals(STATE.NOTINITED, router.getServiceState());
+ router.init(routerConfig);
+ assertEquals(STATE.INITED, router.getServiceState());
+ router.start();
+ assertEquals(STATE.STARTED, router.getServiceState());
+ router.stop();
+ assertEquals(STATE.STOPPED, router.getServiceState());
+ router.close();
+ }
+
+ @Test
+ public void testRouterService() throws InterruptedException, IOException {
+
+ // Run with all services
+ testRouterStartup((new RouterConfigBuilder(conf)).build());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org