You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/06/30 01:30:35 UTC
[19/50] [abbrv] hadoop git commit: HDFS-6440. Support more than 2
NameNodes. Contributed by Jesse Yates.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index fdbacdc..0a21886 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -62,6 +62,8 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -445,7 +447,7 @@ public class MiniDFSCluster {
final int numNameNodes = builder.nnTopology.countNameNodes();
LOG.info("starting cluster: numNameNodes=" + numNameNodes
+ ", numDataNodes=" + builder.numDataNodes);
- nameNodes = new NameNodeInfo[numNameNodes];
+
this.storagesPerDatanode = builder.storagesPerDatanode;
// Duplicate the storageType setting for each DN.
@@ -515,7 +517,7 @@ public class MiniDFSCluster {
}
private Configuration conf;
- private NameNodeInfo[] nameNodes;
+ private Multimap<String, NameNodeInfo> namenodes = ArrayListMultimap.create();
protected int numDataNodes;
protected final List<DataNodeProperties> dataNodes =
new ArrayList<DataNodeProperties>();
@@ -539,10 +541,10 @@ public class MiniDFSCluster {
* Stores the information related to a namenode in the cluster
*/
public static class NameNodeInfo {
- final NameNode nameNode;
- final Configuration conf;
- final String nameserviceId;
- final String nnId;
+ public NameNode nameNode;
+ Configuration conf;
+ String nameserviceId;
+ String nnId;
StartupOption startOpt;
NameNodeInfo(NameNode nn, String nameserviceId, String nnId,
StartupOption startOpt, Configuration conf) {
@@ -563,7 +565,6 @@ public class MiniDFSCluster {
* without a name node (ie when the name node is started elsewhere).
*/
public MiniDFSCluster() {
- nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
synchronized (MiniDFSCluster.class) {
instanceId = instanceCount++;
@@ -740,7 +741,6 @@ public class MiniDFSCluster {
StartupOption operation,
String[] racks, String hosts[],
long[] simulatedCapacities) throws IOException {
- this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
this.storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
initMiniDFSCluster(conf, numDataNodes, null, format,
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
@@ -814,7 +814,7 @@ public class MiniDFSCluster {
createNameNodesAndSetConf(
nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
enableManagedDfsDirsRedundancy,
- format, startOpt, clusterId, conf);
+ format, startOpt, clusterId);
} catch (IOException ioe) {
LOG.error("IOE creating namenodes. Permissions dump:\n" +
createPermissionsDiagnosisString(data_dir), ioe);
@@ -871,7 +871,127 @@ public class MiniDFSCluster {
private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs,
boolean enableManagedDfsDirsRedundancy, boolean format,
+ StartupOption operation, String clusterId) throws IOException {
+ // do the basic namenode configuration
+ configureNameNodes(nnTopology, federation, conf);
+
+ int nnCounter = 0;
+ int nsCounter = 0;
+ // configure each NS independently
+ for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
+ configureNameService(nameservice, nsCounter++, manageNameDfsSharedDirs,
+ manageNameDfsDirs, enableManagedDfsDirsRedundancy,
+ format, operation, clusterId, nnCounter);
+ nnCounter += nameservice.getNNs().size();
+ }
+ }
+
+ /**
+ * Do the rest of the NN configuration for things like shared edits,
+ * as well as directory formatting, etc. for a single nameservice
+ * @param nnCounter the count of the number of namenodes already configured/started. Also,
+ * acts as the <i>index</i> to the next NN to start (since indicies start at 0).
+ * @throws IOException
+ */
+ private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCounter,
+ boolean manageNameDfsSharedDirs, boolean manageNameDfsDirs, boolean
+ enableManagedDfsDirsRedundancy, boolean format,
StartupOption operation, String clusterId,
+ final int nnCounter) throws IOException{
+ String nsId = nameservice.getId();
+ String lastDefaultFileSystem = null;
+
+ // If HA is enabled on this nameservice, enumerate all the namenodes
+ // in the configuration. Also need to set a shared edits dir
+ int numNNs = nameservice.getNNs().size();
+ if (numNNs > 1 && manageNameDfsSharedDirs) {
+ URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter + numNNs - 1);
+ conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
+ // Clean out the shared edits dir completely, including all subdirectories.
+ FileUtil.fullyDelete(new File(sharedEditsUri));
+ }
+
+ // Now format first NN and copy the storage directory from that node to the others.
+ int nnIndex = nnCounter;
+ Collection<URI> prevNNDirs = null;
+ for (NNConf nn : nameservice.getNNs()) {
+ initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
+ manageNameDfsDirs, nnIndex);
+ Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
+ if (format) {
+ // delete the existing namespaces
+ for (URI nameDirUri : namespaceDirs) {
+ File nameDir = new File(nameDirUri);
+ if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
+ throw new IOException("Could not fully delete " + nameDir);
+ }
+ }
+
+ // delete the checkpoint directories, if they exist
+ Collection<URI> checkpointDirs = Util.stringCollectionAsURIs(conf
+ .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY));
+ for (URI checkpointDirUri : checkpointDirs) {
+ File checkpointDir = new File(checkpointDirUri);
+ if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) {
+ throw new IOException("Could not fully delete " + checkpointDir);
+ }
+ }
+ }
+
+ boolean formatThisOne = format;
+ // if we are looking at not the first NN
+ if (nnIndex++ > nnCounter && format) {
+ // Don't format the second, third, etc NN in an HA setup - that
+ // would result in it having a different clusterID,
+ // block pool ID, etc. Instead, copy the name dirs
+ // from the previous one.
+ formatThisOne = false;
+ assert (null != prevNNDirs);
+ copyNameDirs(prevNNDirs, namespaceDirs, conf);
+ }
+
+ if (formatThisOne) {
+ // Allow overriding clusterID for specific NNs to test
+ // misconfiguration.
+ if (nn.getClusterId() == null) {
+ StartupOption.FORMAT.setClusterId(clusterId);
+ } else {
+ StartupOption.FORMAT.setClusterId(nn.getClusterId());
+ }
+ DFSTestUtil.formatNameNode(conf);
+ }
+ prevNNDirs = namespaceDirs;
+ }
+
+ // create all the namenodes in the namespace
+ nnIndex = nnCounter;
+ for (NNConf nn : nameservice.getNNs()) {
+ initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
+ enableManagedDfsDirsRedundancy, nnIndex++);
+ NameNodeInfo info = createNameNode(conf, false, operation,
+ clusterId, nsId, nn.getNnId());
+
+ // Record the last namenode uri
+ if (info != null && info.conf != null) {
+ lastDefaultFileSystem =
+ info.conf.get(FS_DEFAULT_NAME_KEY);
+ }
+ }
+ if (!federation && lastDefaultFileSystem != null) {
+ // Set the default file system to the actual bind address of NN.
+ conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem);
+ }
+ }
+
+ /**
+ * Do the basic NN configuration for the topology. Does not configure things like the shared
+ * edits directories
+ * @param nnTopology
+ * @param federation
+ * @param conf
+ * @throws IOException
+ */
+ public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean federation,
Configuration conf) throws IOException {
Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
"empty NN topology: no namenodes specified!");
@@ -884,22 +1004,21 @@ public class MiniDFSCluster {
// NN is started.
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:" + onlyNN.getIpcPort());
}
-
+
List<String> allNsIds = Lists.newArrayList();
for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
if (nameservice.getId() != null) {
allNsIds.add(nameservice.getId());
}
}
+
if (!allNsIds.isEmpty()) {
conf.set(DFS_NAMESERVICES, Joiner.on(",").join(allNsIds));
}
-
- int nnCounter = 0;
+
for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
String nsId = nameservice.getId();
- String lastDefaultFileSystem = null;
-
+
Preconditions.checkArgument(
!federation || nsId != null,
"if there is more than one NS, they must have names");
@@ -918,85 +1037,10 @@ public class MiniDFSCluster {
// If HA is enabled on this nameservice, enumerate all the namenodes
// in the configuration. Also need to set a shared edits dir
if (nnIds.size() > 1) {
- conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()),
- Joiner.on(",").join(nnIds));
- if (manageNameDfsSharedDirs) {
- URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1);
- conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
- // Clean out the shared edits dir completely, including all subdirectories.
- FileUtil.fullyDelete(new File(sharedEditsUri));
- }
- }
-
- // Now format first NN and copy the storage directory from that node to the others.
- int i = 0;
- Collection<URI> prevNNDirs = null;
- int nnCounterForFormat = nnCounter;
- for (NNConf nn : nameservice.getNNs()) {
- initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, manageNameDfsDirs,
- nnCounterForFormat);
- Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
- if (format) {
- for (URI nameDirUri : namespaceDirs) {
- File nameDir = new File(nameDirUri);
- if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
- throw new IOException("Could not fully delete " + nameDir);
- }
- }
- Collection<URI> checkpointDirs = Util.stringCollectionAsURIs(conf
- .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY));
- for (URI checkpointDirUri : checkpointDirs) {
- File checkpointDir = new File(checkpointDirUri);
- if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) {
- throw new IOException("Could not fully delete " + checkpointDir);
- }
- }
- }
-
- boolean formatThisOne = format;
- if (format && i++ > 0) {
- // Don't format the second NN in an HA setup - that
- // would result in it having a different clusterID,
- // block pool ID, etc. Instead, copy the name dirs
- // from the first one.
- formatThisOne = false;
- assert (null != prevNNDirs);
- copyNameDirs(prevNNDirs, namespaceDirs, conf);
- }
-
- nnCounterForFormat++;
- if (formatThisOne) {
- // Allow overriding clusterID for specific NNs to test
- // misconfiguration.
- if (nn.getClusterId() == null) {
- StartupOption.FORMAT.setClusterId(clusterId);
- } else {
- StartupOption.FORMAT.setClusterId(nn.getClusterId());
- }
- DFSTestUtil.formatNameNode(conf);
- }
- prevNNDirs = namespaceDirs;
- }
-
- // Start all Namenodes
- for (NNConf nn : nameservice.getNNs()) {
- initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs,
- enableManagedDfsDirsRedundancy, nnCounter);
- createNameNode(nnCounter, conf, numDataNodes, false, operation,
- clusterId, nsId, nn.getNnId());
- // Record the last namenode uri
- if (nameNodes[nnCounter] != null && nameNodes[nnCounter].conf != null) {
- lastDefaultFileSystem =
- nameNodes[nnCounter].conf.get(FS_DEFAULT_NAME_KEY);
- }
- nnCounter++;
- }
- if (!federation && lastDefaultFileSystem != null) {
- // Set the default file system to the actual bind address of NN.
- conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem);
+ conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()), Joiner
+ .on(",").join(nnIds));
}
}
-
}
public URI getSharedEditsDir(int minNN, int maxNN) throws IOException {
@@ -1010,39 +1054,92 @@ public class MiniDFSCluster {
}
public NameNodeInfo[] getNameNodeInfos() {
- return this.nameNodes;
+ return this.namenodes.values().toArray(new NameNodeInfo[0]);
}
- private void initNameNodeConf(Configuration conf,
- String nameserviceId, String nnId,
- boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy,
- int nnIndex) throws IOException {
+ /**
+ * @param nsIndex index of the namespace id to check
+ * @return all the namenodes bound to the given namespace index
+ */
+ public NameNodeInfo[] getNameNodeInfos(int nsIndex) {
+ int i = 0;
+ for (String ns : this.namenodes.keys()) {
+ if (i++ == nsIndex) {
+ return this.namenodes.get(ns).toArray(new NameNodeInfo[0]);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @param nameservice id of nameservice to read
+ * @return all the namenodes bound to the given namespace index
+ */
+ public NameNodeInfo[] getNameNodeInfos(String nameservice) {
+ for (String ns : this.namenodes.keys()) {
+ if (nameservice.equals(ns)) {
+ return this.namenodes.get(ns).toArray(new NameNodeInfo[0]);
+ }
+ }
+ return null;
+ }
+
+
+ private void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId,
+ boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, int nnIndex)
+ throws IOException {
if (nameserviceId != null) {
conf.set(DFS_NAMESERVICE_ID, nameserviceId);
}
if (nnId != null) {
conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
}
-
if (manageNameDfsDirs) {
if (enableManagedDfsDirsRedundancy) {
- conf.set(DFS_NAMENODE_NAME_DIR_KEY,
- fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
- fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
- conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
- fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
- fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
+ File[] files = getNameNodeDirectory(nsIndex, nnIndex);
+ conf.set(DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(files[0]) + "," + fileAsURI(files[1]));
+ files = getCheckpointDirectory(nsIndex, nnIndex);
+ conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, fileAsURI(files[0]) + "," + fileAsURI(files[1]));
} else {
- conf.set(DFS_NAMENODE_NAME_DIR_KEY,
- fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1))).
- toString());
- conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
- fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1))).
- toString());
+ File[] files = getNameNodeDirectory(nsIndex, nnIndex);
+ conf.set(DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(files[0]).toString());
+ files = getCheckpointDirectory(nsIndex, nnIndex);
+ conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, fileAsURI(files[0]).toString());
}
}
}
+ private File[] getNameNodeDirectory(int nameserviceIndex, int nnIndex) {
+ return getNameNodeDirectory(base_dir, nameserviceIndex, nnIndex);
+ }
+
+ public static File[] getNameNodeDirectory(String base_dir, int nsIndex, int nnIndex) {
+ return getNameNodeDirectory(new File(base_dir), nsIndex, nnIndex);
+ }
+
+ public static File[] getNameNodeDirectory(File base_dir, int nsIndex, int nnIndex) {
+ File[] files = new File[2];
+ files[0] = new File(base_dir, "name-" + nsIndex + "-" + (2 * nnIndex + 1));
+ files[1] = new File(base_dir, "name-" + nsIndex + "-" + (2 * nnIndex + 2));
+ return files;
+ }
+
+ public File[] getCheckpointDirectory(int nsIndex, int nnIndex) {
+ return getCheckpointDirectory(base_dir, nsIndex, nnIndex);
+ }
+
+ public static File[] getCheckpointDirectory(String base_dir, int nsIndex, int nnIndex) {
+ return getCheckpointDirectory(new File(base_dir), nsIndex, nnIndex);
+ }
+
+ public static File[] getCheckpointDirectory(File base_dir, int nsIndex, int nnIndex) {
+ File[] files = new File[2];
+ files[0] = new File(base_dir, "namesecondary-" + nsIndex + "-" + (2 * nnIndex + 1));
+ files[1] = new File(base_dir, "namesecondary-" + nsIndex + "-" + (2 * nnIndex + 2));
+ return files;
+ }
+
+
public static void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
Configuration dstConf) throws IOException {
URI srcDir = Lists.newArrayList(srcDirs).get(0);
@@ -1094,12 +1191,9 @@ public class MiniDFSCluster {
new String[] {} : new String[] {operation.getName()};
return args;
}
-
- private void createNameNode(int nnIndex, Configuration conf,
- int numDataNodes, boolean format, StartupOption operation,
- String clusterId, String nameserviceId,
- String nnId)
- throws IOException {
+
+ private NameNodeInfo createNameNode(Configuration conf, boolean format, StartupOption operation,
+ String clusterId, String nameserviceId, String nnId) throws IOException {
// Format and clean out DataNode directories
if (format) {
DFSTestUtil.formatNameNode(conf);
@@ -1113,7 +1207,7 @@ public class MiniDFSCluster {
String[] args = createArgs(operation);
NameNode nn = NameNode.createNameNode(args, conf);
if (operation == StartupOption.RECOVER) {
- return;
+ return null;
}
// After the NN has started, set back the bound ports into
@@ -1131,14 +1225,17 @@ public class MiniDFSCluster {
DFSUtil.setGenericConf(conf, nameserviceId, nnId,
DFS_NAMENODE_HTTP_ADDRESS_KEY);
- nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId,
+ NameNodeInfo info = new NameNodeInfo(nn, nameserviceId, nnId,
operation, new Configuration(conf));
+ namenodes.put(nameserviceId, info);
+
// Restore the default fs name
if (originalDefaultFs == null) {
conf.set(FS_DEFAULT_NAME_KEY, "");
} else {
conf.set(FS_DEFAULT_NAME_KEY, originalDefaultFs);
}
+ return info;
}
/**
@@ -1154,7 +1251,7 @@ public class MiniDFSCluster {
*/
public URI getURI(int nnIndex) {
String hostPort =
- nameNodes[nnIndex].nameNode.getNameNodeAddressHostPortString();
+ getNN(nnIndex).nameNode.getNameNodeAddressHostPortString();
URI uri = null;
try {
uri = new URI("hdfs://" + hostPort);
@@ -1172,9 +1269,21 @@ public class MiniDFSCluster {
* @return Configuration of for the given namenode
*/
public Configuration getConfiguration(int nnIndex) {
- return nameNodes[nnIndex].conf;
+ return getNN(nnIndex).conf;
}
+ private NameNodeInfo getNN(int nnIndex) {
+ int count = 0;
+ for (NameNodeInfo nn : namenodes.values()) {
+ if (count == nnIndex) {
+ return nn;
+ }
+ count++;
+ }
+ return null;
+ }
+
+
/**
* wait for the given namenode to get out of safemode.
*/
@@ -1593,7 +1702,7 @@ public class MiniDFSCluster {
* @throws Exception
*/
public void finalizeCluster(int nnIndex, Configuration conf) throws Exception {
- finalizeNamenode(nameNodes[nnIndex].nameNode, nameNodes[nnIndex].conf);
+ finalizeNamenode(getNN(nnIndex).nameNode, getNN(nnIndex).conf);
}
/**
@@ -1604,7 +1713,7 @@ public class MiniDFSCluster {
* @throws IllegalStateException if the Namenode is not running.
*/
public void finalizeCluster(Configuration conf) throws Exception {
- for (NameNodeInfo nnInfo : nameNodes) {
+ for (NameNodeInfo nnInfo : namenodes.values()) {
if (nnInfo == null) {
throw new IllegalStateException("Attempting to finalize "
+ "Namenode but it is not running");
@@ -1612,9 +1721,9 @@ public class MiniDFSCluster {
finalizeNamenode(nnInfo.nameNode, nnInfo.conf);
}
}
-
+
public int getNumNameNodes() {
- return nameNodes.length;
+ return namenodes.size();
}
/**
@@ -1644,7 +1753,7 @@ public class MiniDFSCluster {
* Gets the NameNode for the index. May be null.
*/
public NameNode getNameNode(int nnIndex) {
- return nameNodes[nnIndex].nameNode;
+ return getNN(nnIndex).nameNode;
}
/**
@@ -1653,11 +1762,11 @@ public class MiniDFSCluster {
*/
public FSNamesystem getNamesystem() {
checkSingleNameNode();
- return NameNodeAdapter.getNamesystem(nameNodes[0].nameNode);
+ return NameNodeAdapter.getNamesystem(getNN(0).nameNode);
}
-
+
public FSNamesystem getNamesystem(int nnIndex) {
- return NameNodeAdapter.getNamesystem(nameNodes[nnIndex].nameNode);
+ return NameNodeAdapter.getNamesystem(getNN(nnIndex).nameNode);
}
/**
@@ -1697,14 +1806,14 @@ public class MiniDFSCluster {
* caller supplied port is not necessarily the actual port used.
*/
public int getNameNodePort(int nnIndex) {
- return nameNodes[nnIndex].nameNode.getNameNodeAddress().getPort();
+ return getNN(nnIndex).nameNode.getNameNodeAddress().getPort();
}
/**
* @return the service rpc port used by the NameNode at the given index.
*/
public int getNameNodeServicePort(int nnIndex) {
- return nameNodes[nnIndex].nameNode.getServiceRpcAddress().getPort();
+ return getNN(nnIndex).nameNode.getServiceRpcAddress().getPort();
}
/**
@@ -1745,7 +1854,7 @@ public class MiniDFSCluster {
fileSystems.clear();
}
shutdownDataNodes();
- for (NameNodeInfo nnInfo : nameNodes) {
+ for (NameNodeInfo nnInfo : namenodes.values()) {
if (nnInfo == null) continue;
NameNode nameNode = nnInfo.nameNode;
if (nameNode != null) {
@@ -1781,7 +1890,7 @@ public class MiniDFSCluster {
* Shutdown all the namenodes.
*/
public synchronized void shutdownNameNodes() {
- for (int i = 0; i < nameNodes.length; i++) {
+ for (int i = 0; i < namenodes.size(); i++) {
shutdownNameNode(i);
}
}
@@ -1790,13 +1899,15 @@ public class MiniDFSCluster {
* Shutdown the namenode at a given index.
*/
public synchronized void shutdownNameNode(int nnIndex) {
- NameNode nn = nameNodes[nnIndex].nameNode;
+ NameNodeInfo info = getNN(nnIndex);
+ NameNode nn = info.nameNode;
if (nn != null) {
LOG.info("Shutting down the namenode");
nn.stop();
nn.join();
- Configuration conf = nameNodes[nnIndex].conf;
- nameNodes[nnIndex] = new NameNodeInfo(null, null, null, null, conf);
+ info.nnId = null;
+ info.nameNode = null;
+ info.nameserviceId = null;
}
}
@@ -1804,7 +1915,7 @@ public class MiniDFSCluster {
* Restart all namenodes.
*/
public synchronized void restartNameNodes() throws IOException {
- for (int i = 0; i < nameNodes.length; i++) {
+ for (int i = 0; i < namenodes.size(); i++) {
restartNameNode(i, false);
}
waitActive();
@@ -1840,19 +1951,19 @@ public class MiniDFSCluster {
*/
public synchronized void restartNameNode(int nnIndex, boolean waitActive,
String... args) throws IOException {
- String nameserviceId = nameNodes[nnIndex].nameserviceId;
- String nnId = nameNodes[nnIndex].nnId;
- StartupOption startOpt = nameNodes[nnIndex].startOpt;
- Configuration conf = nameNodes[nnIndex].conf;
+ NameNodeInfo info = getNN(nnIndex);
+ StartupOption startOpt = info.startOpt;
+
shutdownNameNode(nnIndex);
if (args.length != 0) {
startOpt = null;
} else {
args = createArgs(startOpt);
}
- NameNode nn = NameNode.createNameNode(args, conf);
- nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, startOpt,
- conf);
+
+ NameNode nn = NameNode.createNameNode(args, info.conf);
+ info.nameNode = nn;
+ info.setStartOpt(startOpt);
if (waitActive) {
waitClusterUp();
LOG.info("Restarted the namenode");
@@ -2124,7 +2235,7 @@ public class MiniDFSCluster {
* or if waiting for safe mode is disabled.
*/
public boolean isNameNodeUp(int nnIndex) {
- NameNode nameNode = nameNodes[nnIndex].nameNode;
+ NameNode nameNode = getNN(nnIndex).nameNode;
if (nameNode == null) {
return false;
}
@@ -2142,7 +2253,7 @@ public class MiniDFSCluster {
* Returns true if all the NameNodes are running and is out of Safe Mode.
*/
public boolean isClusterUp() {
- for (int index = 0; index < nameNodes.length; index++) {
+ for (int index = 0; index < namenodes.size(); index++) {
if (!isNameNodeUp(index)) {
return false;
}
@@ -2172,15 +2283,13 @@ public class MiniDFSCluster {
checkSingleNameNode();
return getFileSystem(0);
}
-
+
/**
* Get a client handle to the DFS cluster for the namenode at given index.
*/
public DistributedFileSystem getFileSystem(int nnIndex) throws IOException {
- DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(
- getURI(nnIndex), nameNodes[nnIndex].conf);
- fileSystems.add(dfs);
- return dfs;
+ return (DistributedFileSystem) addFileSystem(FileSystem.get(getURI(nnIndex),
+ getNN(nnIndex).conf));
}
/**
@@ -2188,17 +2297,20 @@ public class MiniDFSCluster {
* This simulating different threads working on different FileSystem instances.
*/
public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException {
- FileSystem dfs = FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf);
- fileSystems.add(dfs);
- return dfs;
+ return addFileSystem(FileSystem.newInstance(getURI(nnIndex), getNN(nnIndex).conf));
}
-
+
+ private <T extends FileSystem> T addFileSystem(T fs) {
+ fileSystems.add(fs);
+ return fs;
+ }
+
/**
* @return a http URL
*/
public String getHttpUri(int nnIndex) {
return "http://"
- + nameNodes[nnIndex].conf
+ + getNN(nnIndex).conf
.get(DFS_NAMENODE_HTTP_ADDRESS_KEY);
}
@@ -2206,14 +2318,14 @@ public class MiniDFSCluster {
* Get the directories where the namenode stores its image.
*/
public Collection<URI> getNameDirs(int nnIndex) {
- return FSNamesystem.getNamespaceDirs(nameNodes[nnIndex].conf);
+ return FSNamesystem.getNamespaceDirs(getNN(nnIndex).conf);
}
/**
* Get the directories where the namenode stores its edits.
*/
public Collection<URI> getNameEditsDirs(int nnIndex) throws IOException {
- return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
+ return FSNamesystem.getNamespaceEditsDirs(getNN(nnIndex).conf);
}
public void transitionToActive(int nnIndex) throws IOException,
@@ -2254,11 +2366,12 @@ public class MiniDFSCluster {
/** Wait until the given namenode gets registration from all the datanodes */
public void waitActive(int nnIndex) throws IOException {
- if (nameNodes.length == 0 || nameNodes[nnIndex] == null
- || nameNodes[nnIndex].nameNode == null) {
+ if (namenodes.size() == 0 || getNN(nnIndex) == null || getNN(nnIndex).nameNode == null) {
return;
}
- InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
+
+ NameNodeInfo info = getNN(nnIndex);
+ InetSocketAddress addr = info.nameNode.getServiceRpcAddress();
assert addr.getPort() != 0;
DFSClient client = new DFSClient(addr, conf);
@@ -2278,7 +2391,7 @@ public class MiniDFSCluster {
* Wait until the cluster is active and running.
*/
public void waitActive() throws IOException {
- for (int index = 0; index < nameNodes.length; index++) {
+ for (int index = 0; index < namenodes.size(); index++) {
int failedCount = 0;
while (true) {
try {
@@ -2298,7 +2411,14 @@ public class MiniDFSCluster {
}
LOG.info("Cluster is active");
}
-
+
+ public void printNNs() {
+ for (int i = 0; i < namenodes.size(); i++) {
+ LOG.info("Have namenode " + i + ", info:" + getNN(i));
+ LOG.info(" has namenode: " + getNN(i).nameNode);
+ }
+ }
+
private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,
InetSocketAddress addr) {
// If a datanode failed to start, then do not wait
@@ -2696,7 +2816,7 @@ public class MiniDFSCluster {
* namenode
*/
private void checkSingleNameNode() {
- if (nameNodes.length != 1) {
+ if (namenodes.size() != 1) {
throw new IllegalArgumentException("Namenode index is needed");
}
}
@@ -2712,13 +2832,9 @@ public class MiniDFSCluster {
if(!federation)
throw new IOException("cannot add namenode to non-federated cluster");
- int nnIndex = nameNodes.length;
- int numNameNodes = nameNodes.length + 1;
- NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes];
- System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length);
- nameNodes = newlist;
- String nameserviceId = NAMESERVICE_ID_PREFIX + (nnIndex + 1);
-
+ int nameServiceIndex = namenodes.keys().size();
+ String nameserviceId = NAMESERVICE_ID_PREFIX + (namenodes.keys().size() + 1);
+
String nameserviceIds = conf.get(DFS_NAMESERVICES);
nameserviceIds += "," + nameserviceId;
conf.set(DFS_NAMESERVICES, nameserviceIds);
@@ -2726,9 +2842,11 @@ public class MiniDFSCluster {
String nnId = null;
initNameNodeAddress(conf, nameserviceId,
new NNConf(nnId).setIpcPort(namenodePort));
- initNameNodeConf(conf, nameserviceId, nnId, true, true, nnIndex);
- createNameNode(nnIndex, conf, numDataNodes, true, null, null,
- nameserviceId, nnId);
+ // figure out the current number of NNs
+ NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId);
+ int nnIndex = infos == null ? 0 : infos.length;
+ initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex);
+ NameNodeInfo info = createNameNode(conf, true, null, null, nameserviceId, nnId);
// Refresh datanodes with the newly started namenode
for (DataNodeProperties dn : dataNodes) {
@@ -2738,7 +2856,7 @@ public class MiniDFSCluster {
// Wait for new namenode to get registrations from all the datanodes
waitActive(nnIndex);
- return nameNodes[nnIndex].nameNode;
+ return info.nameNode;
}
protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
index a99e9c3..b9786a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
@@ -56,10 +56,20 @@ public class MiniDFSNNTopology {
* Set up an HA topology with a single HA nameservice.
*/
public static MiniDFSNNTopology simpleHATopology() {
- return new MiniDFSNNTopology()
- .addNameservice(new MiniDFSNNTopology.NSConf("minidfs-ns")
- .addNN(new MiniDFSNNTopology.NNConf("nn1"))
- .addNN(new MiniDFSNNTopology.NNConf("nn2")));
+ return simpleHATopology(2);
+ }
+
+ /**
+ * Set up an HA topology with a single HA nameservice.
+ * @param nnCount of namenodes to use with the nameservice
+ */
+ public static MiniDFSNNTopology simpleHATopology(int nnCount) {
+ MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf("minidfs-ns");
+ for (int i = 1; i <= nnCount; i++) {
+ nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i));
+ }
+ MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(nameservice);
+ return topology;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
index ad907f6..fae1024 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
@@ -303,12 +303,12 @@ public class TestDFSUpgradeFromImage {
unpackStorage(HADOOP22_IMAGE, HADOOP_DFS_DIR_TXT);
// Overwrite the md5 stored in the VERSION files
- File baseDir = new File(MiniDFSCluster.getBaseDirectory());
+ File[] nnDirs = MiniDFSCluster.getNameNodeDirectory(MiniDFSCluster.getBaseDirectory(), 0, 0);
FSImageTestUtil.corruptVersionFile(
- new File(baseDir, "name1/current/VERSION"),
+ new File(nnDirs[0], "current/VERSION"),
"imageMD5Digest", "22222222222222222222222222222222");
FSImageTestUtil.corruptVersionFile(
- new File(baseDir, "name2/current/VERSION"),
+ new File(nnDirs[1], "current/VERSION"),
"imageMD5Digest", "22222222222222222222222222222222");
// Attach our own log appender so we can verify output
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
index c4c890f..b50b1cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
/**
* This class tests rolling upgrade.
@@ -66,7 +67,7 @@ public class TestRollingUpgrade {
*/
@Test
public void testDFSAdminRollingUpgradeCommands() throws Exception {
- // start a cluster
+ // start a cluster
final Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
@@ -97,7 +98,7 @@ public class TestRollingUpgrade {
runCmd(dfsadmin, true, "-rollingUpgrade", "query");
dfs.mkdirs(bar);
-
+
//finalize rolling upgrade
runCmd(dfsadmin, true, "-rollingUpgrade", "finalize");
@@ -143,7 +144,7 @@ public class TestRollingUpgrade {
String nnDirPrefix = MiniDFSCluster.getBaseDirectory() + "/nn/";
final File nn1Dir = new File(nnDirPrefix + "image1");
final File nn2Dir = new File(nnDirPrefix + "image2");
-
+
LOG.info("nn1Dir=" + nn1Dir);
LOG.info("nn2Dir=" + nn2Dir);
@@ -186,9 +187,9 @@ public class TestRollingUpgrade {
final RollingUpgradeInfo info1;
{
- final DistributedFileSystem dfs = cluster.getFileSystem();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
dfs.mkdirs(foo);
-
+
//start rolling upgrade
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
info1 = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
@@ -197,7 +198,7 @@ public class TestRollingUpgrade {
//query rolling upgrade
Assert.assertEquals(info1, dfs.rollingUpgrade(RollingUpgradeAction.QUERY));
-
+
dfs.mkdirs(bar);
cluster.shutdown();
}
@@ -209,8 +210,8 @@ public class TestRollingUpgrade {
.format(false)
.manageNameDfsDirs(false)
.build();
- final DistributedFileSystem dfs2 = cluster2.getFileSystem();
-
+ final DistributedFileSystem dfs2 = cluster2.getFileSystem();
+
// Check that cluster2 sees the edits made on cluster1
Assert.assertTrue(dfs2.exists(foo));
Assert.assertTrue(dfs2.exists(bar));
@@ -260,7 +261,7 @@ public class TestRollingUpgrade {
@Test
public void testRollback() throws IOException {
- // start a cluster
+ // start a cluster
final Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
@@ -305,7 +306,7 @@ public class TestRollingUpgrade {
if(cluster != null) cluster.shutdown();
}
}
-
+
private static void startRollingUpgrade(Path foo, Path bar,
Path file, byte[] data,
MiniDFSCluster cluster) throws IOException {
@@ -327,7 +328,7 @@ public class TestRollingUpgrade {
TestFileTruncate.checkBlockRecovery(file, dfs);
AppendTestUtil.checkFullFile(dfs, file, newLength, data);
}
-
+
private static void rollbackRollingUpgrade(Path foo, Path bar,
Path file, byte[] data,
MiniDFSCluster cluster) throws IOException {
@@ -372,22 +373,33 @@ public class TestRollingUpgrade {
}
}
- @Test (timeout = 300000)
+ @Test(timeout = 300000)
public void testFinalize() throws Exception {
+ testFinalize(2);
+ }
+
+ @Test(timeout = 300000)
+ public void testFinalizeWithMultipleNN() throws Exception {
+ testFinalize(3);
+ }
+
+ private void testFinalize(int nnCount) throws Exception {
final Configuration conf = new HdfsConfiguration();
MiniQJMHACluster cluster = null;
final Path foo = new Path("/foo");
final Path bar = new Path("/bar");
try {
- cluster = new MiniQJMHACluster.Builder(conf).build();
+ cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
MiniDFSCluster dfsCluster = cluster.getDfsCluster();
dfsCluster.waitActive();
- // let NN1 tail editlog every 1s
- dfsCluster.getConfiguration(1).setInt(
- DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
- dfsCluster.restartNameNode(1);
+ // let other NN tail editlog every 1s
+ for(int i=1; i < nnCount; i++) {
+ dfsCluster.getConfiguration(i).setInt(
+ DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+ }
+ dfsCluster.restartNameNodes();
dfsCluster.transitionToActive(0);
DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
@@ -425,17 +437,29 @@ public class TestRollingUpgrade {
@Test (timeout = 300000)
public void testQuery() throws Exception {
+ testQuery(2);
+ }
+
+ @Test (timeout = 300000)
+ public void testQueryWithMultipleNN() throws Exception {
+ testQuery(3);
+ }
+
+ private void testQuery(int nnCount) throws Exception{
final Configuration conf = new Configuration();
MiniQJMHACluster cluster = null;
try {
- cluster = new MiniQJMHACluster.Builder(conf).build();
+ cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
MiniDFSCluster dfsCluster = cluster.getDfsCluster();
dfsCluster.waitActive();
dfsCluster.transitionToActive(0);
DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
- dfsCluster.shutdownNameNode(1);
+ // shutdown other NNs
+ for (int i = 1; i < nnCount; i++) {
+ dfsCluster.shutdownNameNode(i);
+ }
// start rolling upgrade
RollingUpgradeInfo info = dfs
@@ -445,13 +469,16 @@ public class TestRollingUpgrade {
info = dfs.rollingUpgrade(RollingUpgradeAction.QUERY);
Assert.assertFalse(info.createdRollbackImages());
- dfsCluster.restartNameNode(1);
-
+ // restart other NNs
+ for (int i = 1; i < nnCount; i++) {
+ dfsCluster.restartNameNode(i);
+ }
+ // check that one of the other NNs has created the rollback image and uploaded it
queryForPreparation(dfs);
// The NN should have a copy of the fsimage in case of rollbacks.
Assert.assertTrue(dfsCluster.getNamesystem(0).getFSImage()
- .hasRollbackFSImage());
+ .hasRollbackFSImage());
} finally {
if (cluster != null) {
cluster.shutdown();
@@ -487,6 +514,15 @@ public class TestRollingUpgrade {
@Test(timeout = 300000)
public void testCheckpoint() throws IOException, InterruptedException {
+ testCheckpoint(2);
+ }
+
+ @Test(timeout = 300000)
+ public void testCheckpointWithMultipleNN() throws IOException, InterruptedException {
+ testCheckpoint(3);
+ }
+
+ public void testCheckpoint(int nnCount) throws IOException, InterruptedException {
final Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1);
@@ -495,7 +531,7 @@ public class TestRollingUpgrade {
final Path foo = new Path("/foo");
try {
- cluster = new MiniQJMHACluster.Builder(conf).build();
+ cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
MiniDFSCluster dfsCluster = cluster.getDfsCluster();
dfsCluster.waitActive();
@@ -513,16 +549,9 @@ public class TestRollingUpgrade {
long txid = dfs.rollEdits();
Assert.assertTrue(txid > 0);
- int retries = 0;
- while (++retries < 5) {
- NNStorage storage = dfsCluster.getNamesystem(1).getFSImage()
- .getStorage();
- if (storage.getFsImageName(txid - 1) != null) {
- return;
- }
- Thread.sleep(1000);
+ for(int i=1; i< nnCount; i++) {
+ verifyNNCheckpoint(dfsCluster, txid, i);
}
- Assert.fail("new checkpoint does not exist");
} finally {
if (cluster != null) {
@@ -531,6 +560,22 @@ public class TestRollingUpgrade {
}
}
+ /**
+ * Verify that the namenode at the given index has an FSImage with a TxId up to txid-1
+ */
+ private void verifyNNCheckpoint(MiniDFSCluster dfsCluster, long txid, int nnIndex) throws InterruptedException {
+ int retries = 0;
+ while (++retries < 5) {
+ NNStorage storage = dfsCluster.getNamesystem(nnIndex).getFSImage()
+ .getStorage();
+ if (storage.getFsImageName(txid - 1) != null) {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ Assert.fail("new checkpoint does not exist");
+ }
+
static void queryForPreparation(DistributedFileSystem dfs) throws IOException,
InterruptedException {
RollingUpgradeInfo info;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
index ef4c559..470a08b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
@@ -17,43 +17,39 @@
*/
package org.apache.hadoop.hdfs.qjournal;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
-
-import java.io.IOException;
-import java.net.BindException;
-import java.net.URI;
-import java.util.Random;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
public class MiniQJMHACluster {
private MiniDFSCluster cluster;
private MiniJournalCluster journalCluster;
private final Configuration conf;
private static final Log LOG = LogFactory.getLog(MiniQJMHACluster.class);
-
+
public static final String NAMESERVICE = "ns1";
- private static final String NN1 = "nn1";
- private static final String NN2 = "nn2";
private static final Random RANDOM = new Random();
private int basePort = 10000;
public static class Builder {
private final Configuration conf;
private StartupOption startOpt = null;
+ private int numNNs = 2;
private final MiniDFSCluster.Builder dfsBuilder;
-
+
public Builder(Configuration conf) {
this.conf = conf;
// most QJMHACluster tests don't need DataNodes, so we'll make
@@ -64,7 +60,7 @@ public class MiniQJMHACluster {
public MiniDFSCluster.Builder getDfsBuilder() {
return dfsBuilder;
}
-
+
public MiniQJMHACluster build() throws IOException {
return new MiniQJMHACluster(this);
}
@@ -72,15 +68,25 @@ public class MiniQJMHACluster {
public void startupOption(StartupOption startOpt) {
this.startOpt = startOpt;
}
+
+ public Builder setNumNameNodes(int nns) {
+ this.numNNs = nns;
+ return this;
+ }
+ }
+
+ public static MiniDFSNNTopology createDefaultTopology(int nns, int startingPort) {
+ MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(NAMESERVICE);
+ for (int i = 0; i < nns; i++) {
+ nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i).setIpcPort(startingPort++)
+ .setHttpPort(startingPort++));
+ }
+
+ return new MiniDFSNNTopology().addNameservice(nameservice);
}
-
+
public static MiniDFSNNTopology createDefaultTopology(int basePort) {
- return new MiniDFSNNTopology()
- .addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
- new MiniDFSNNTopology.NNConf("nn1").setIpcPort(basePort)
- .setHttpPort(basePort + 1)).addNN(
- new MiniDFSNNTopology.NNConf("nn2").setIpcPort(basePort + 2)
- .setHttpPort(basePort + 3)));
+ return createDefaultTopology(2, basePort);
}
private MiniQJMHACluster(Builder builder) throws IOException {
@@ -94,10 +100,10 @@ public class MiniQJMHACluster {
.build();
URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
- // start cluster with 2 NameNodes
- MiniDFSNNTopology topology = createDefaultTopology(basePort);
+ // start cluster with specified NameNodes
+ MiniDFSNNTopology topology = createDefaultTopology(builder.numNNs, basePort);
- initHAConf(journalURI, builder.conf);
+ initHAConf(journalURI, builder.conf, builder.numNNs);
// First start up the NNs just to format the namespace. The MinIDFSCluster
// has no way to just format the NameNodes without also starting them.
@@ -110,8 +116,9 @@ public class MiniQJMHACluster {
Configuration confNN0 = cluster.getConfiguration(0);
NameNode.initializeSharedEdits(confNN0, true);
- cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt);
- cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt);
+ for (MiniDFSCluster.NameNodeInfo nn : cluster.getNameNodeInfos()) {
+ nn.setStartOpt(builder.startOpt);
+ }
// restart the cluster
cluster.restartNameNodes();
@@ -123,31 +130,28 @@ public class MiniQJMHACluster {
}
}
}
-
- private Configuration initHAConf(URI journalURI, Configuration conf) {
+
+ private Configuration initHAConf(URI journalURI, Configuration conf, int numNNs) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
journalURI.toString());
-
- String address1 = "127.0.0.1:" + basePort;
- String address2 = "127.0.0.1:" + (basePort + 2);
- conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
- NAMESERVICE, NN1), address1);
- conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
- NAMESERVICE, NN2), address2);
- conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
- conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
- NN1 + "," + NN2);
- conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
- ConfiguredFailoverProxyProvider.class.getName());
- conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
-
+
+ List<String> nns = new ArrayList<String>(numNNs);
+ int port = basePort;
+ for (int i = 0; i < numNNs; i++) {
+ nns.add("127.0.0.1:" + port);
+ // increment by 2 each time to account for the http port in the config setting
+ port += 2;
+ }
+
+ // use standard failover configurations
+ HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns);
return conf;
}
public MiniDFSCluster getDfsCluster() {
return cluster;
}
-
+
public MiniJournalCluster getJournalCluster() {
return journalCluster;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index d5a9426..b203872 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -162,7 +162,7 @@ public class TestBlockToken {
public void testWritable() throws Exception {
TestWritable.testWritable(new BlockTokenIdentifier());
BlockTokenSecretManager sm = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
TestWritable.testWritable(generateTokenId(sm, block1,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)));
TestWritable.testWritable(generateTokenId(sm, block2,
@@ -201,7 +201,7 @@ public class TestBlockToken {
@Test
public void testBlockTokenSecretManager() throws Exception {
BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
ExportedBlockKeys keys = masterHandler.exportKeys();
@@ -244,7 +244,7 @@ public class TestBlockToken {
UserGroupInformation.setConfiguration(conf);
BlockTokenSecretManager sm = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
@@ -283,7 +283,7 @@ public class TestBlockToken {
Assume.assumeTrue(FD_DIR.exists());
BlockTokenSecretManager sm = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
@@ -352,7 +352,7 @@ public class TestBlockToken {
for (int i = 0; i < 10; i++) {
String bpid = Integer.toString(i);
BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
bpMgr.addBlockPool(bpid, slaveHandler);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
index f01be4b..0818571 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
@@ -319,7 +319,7 @@ public class TestBackupNode {
if(fileSys != null) fileSys.close();
if(cluster != null) cluster.shutdown();
}
- File nnCurDir = new File(BASE_DIR, "name1/current/");
+ File nnCurDir = new File(MiniDFSCluster.getNameNodeDirectory(BASE_DIR, 0, 0)[0], "current/");
File bnCurDir = new File(getBackupNodeDir(op, 1), "/current/");
FSImageTestUtil.assertParallelFilesAreIdentical(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
index 5a51cb7..7073726 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
@@ -1428,7 +1428,8 @@ public class TestCheckpoint {
//
secondary = startSecondaryNameNode(conf);
- File secondaryDir = new File(MiniDFSCluster.getBaseDirectory(), "namesecondary1");
+ File secondaryDir = MiniDFSCluster.getCheckpointDirectory(MiniDFSCluster.getBaseDirectory(),
+ 0, 0)[0];
File secondaryCurrent = new File(secondaryDir, "current");
long expectedTxIdToDownload = cluster.getNameNode().getFSImage()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
index 5b72901..a736d27 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
@@ -42,7 +42,8 @@ public class HAStressTestHarness {
private MiniDFSCluster cluster;
static final int BLOCK_SIZE = 1024;
final TestContext testCtx = new TestContext();
-
+ private int nns = 2;
+
public HAStressTestHarness() {
conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@@ -55,11 +56,19 @@ public class HAStressTestHarness {
}
/**
+ * Set the number of namenodes that should be run. This must be set before calling
+ * {@link #startCluster()}
+ */
+ public void setNumberOfNameNodes(int nns) {
+ this.nns = nns;
+ }
+
+ /**
* Start and return the MiniDFSCluster.
*/
public MiniDFSCluster startCluster() throws IOException {
cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .nnTopology(MiniDFSNNTopology.simpleHATopology(nns))
.numDataNodes(3)
.build();
return cluster;
@@ -99,28 +108,27 @@ public class HAStressTestHarness {
}
/**
- * Add a thread which periodically triggers failover back and forth between
- * the two namenodes.
+ * Add a thread which periodically triggers failover back and forth between the namenodes.
*/
public void addFailoverThread(final int msBetweenFailovers) {
testCtx.addThread(new RepeatingTestThread(testCtx) {
-
@Override
public void doAnAction() throws Exception {
- System.err.println("==============================\n" +
- "Failing over from 0->1\n" +
- "==================================");
- cluster.transitionToStandby(0);
- cluster.transitionToActive(1);
-
- Thread.sleep(msBetweenFailovers);
- System.err.println("==============================\n" +
- "Failing over from 1->0\n" +
- "==================================");
-
- cluster.transitionToStandby(1);
- cluster.transitionToActive(0);
- Thread.sleep(msBetweenFailovers);
+ // fail over from one namenode to the next, all the way back to the original NN
+ for (int i = 0; i < nns; i++) {
+ // next node, mod nns so we wrap to the 0th NN on the last iteration
+ int next = (i + 1) % nns;
+ System.err.println("==============================\n"
+ + "[Starting] Failing over from " + i + "->" + next + "\n"
+ + "==============================");
+ cluster.transitionToStandby(i);
+ cluster.transitionToActive(next);
+ System.err.println("==============================\n"
+ + "[Completed] Failing over from " + i + "->" + next + ". Sleeping for "+
+ (msBetweenFailovers/1000) +"sec \n"
+ + "==============================");
+ Thread.sleep(msBetweenFailovers);
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index c7c4a77..5543a2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -24,9 +24,14 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -67,12 +72,11 @@ public abstract class HATestUtil {
*/
public static void waitForStandbyToCatchUp(NameNode active,
NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
-
long activeTxId = active.getNamesystem().getFSImage().getEditLog()
.getLastWrittenTxId();
-
+
active.getRpcServer().rollEditLog();
-
+
long start = Time.now();
while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) {
long nn2HighestTxId = standby.getNamesystem().getFSImage()
@@ -166,34 +170,52 @@ public abstract class HATestUtil {
/** Sets the required configurations for performing failover. */
public static void setFailoverConfigurations(MiniDFSCluster cluster,
Configuration conf, String logicalName, int nsIndex) {
- InetSocketAddress nnAddr1 = cluster.getNameNode(2 * nsIndex).getNameNodeAddress();
- InetSocketAddress nnAddr2 = cluster.getNameNode(2 * nsIndex + 1).getNameNodeAddress();
- setFailoverConfigurations(conf, logicalName, nnAddr1, nnAddr2);
+ MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
+ List<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>(3);
+ for (MiniDFSCluster.NameNodeInfo nn : nns) {
+ nnAddresses.add(nn.nameNode.getNameNodeAddress());
+ }
+ setFailoverConfigurations(conf, logicalName, nnAddresses);
+ }
+
+ public static void setFailoverConfigurations(Configuration conf, String logicalName,
+ InetSocketAddress ... nnAddresses){
+ setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses));
}
/**
* Sets the required configurations for performing failover
*/
public static void setFailoverConfigurations(Configuration conf,
- String logicalName, InetSocketAddress nnAddr1,
- InetSocketAddress nnAddr2) {
- String nameNodeId1 = "nn1";
- String nameNodeId2 = "nn2";
- String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
- String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
- conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
- logicalName, nameNodeId1), address1);
- conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
- logicalName, nameNodeId2), address2);
-
+ String logicalName, List<InetSocketAddress> nnAddresses) {
+ setFailoverConfigurations(conf, logicalName,
+ Iterables.transform(nnAddresses, new Function<InetSocketAddress, String>() {
+
+ // transform the inet address to a simple string
+ @Override
+ public String apply(InetSocketAddress addr) {
+ return "hdfs://" + addr.getHostName() + ":" + addr.getPort();
+ }
+ }));
+ }
+
+ public static void setFailoverConfigurations(Configuration conf, String logicalName,
+ Iterable<String> nnAddresses) {
+ List<String> nnids = new ArrayList<String>();
+ int i = 0;
+ for (String address : nnAddresses) {
+ String nnId = "nn" + (i + 1);
+ nnids.add(nnId);
+ conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, logicalName, nnId), address);
+ i++;
+ }
conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName),
- nameNodeId1 + "," + nameNodeId2);
+ Joiner.on(',').join(nnids));
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
ConfiguredFailoverProxyProvider.class.getName());
conf.set("fs.defaultFS", "hdfs://" + logicalName);
}
-
public static String getLogicalHostname(MiniDFSCluster cluster) {
return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
index 7abc502..16dc766 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
@@ -46,37 +46,47 @@ import com.google.common.collect.ImmutableList;
public class TestBootstrapStandby {
private static final Log LOG = LogFactory.getLog(TestBootstrapStandby.class);
-
+
+ private static final int maxNNCount = 3;
+ private static final int STARTING_PORT = 20000;
+
private MiniDFSCluster cluster;
private NameNode nn0;
-
+
@Before
public void setupCluster() throws IOException {
Configuration conf = new Configuration();
- MiniDFSNNTopology topology = new MiniDFSNNTopology()
- .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
- .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(20001))
- .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(20002)));
-
+ // duplicate code with MiniQJMHACluster#createDefaultTopology, but don't want to cross
+ // dependencies or munge too much code to support it all correctly
+ MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf("ns1");
+ for (int i = 0; i < maxNNCount; i++) {
+ nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i).setHttpPort(STARTING_PORT + i + 1));
+ }
+
+ MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(nameservice);
+
cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(topology)
- .numDataNodes(0)
- .build();
+ .nnTopology(topology)
+ .numDataNodes(0)
+ .build();
cluster.waitActive();
-
+
nn0 = cluster.getNameNode(0);
cluster.transitionToActive(0);
- cluster.shutdownNameNode(1);
+ // shutdown the other NNs
+ for (int i = 1; i < maxNNCount; i++) {
+ cluster.shutdownNameNode(i);
+ }
}
-
+
@After
public void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
}
}
-
+
/**
* Test for the base success case. The primary NN
* hasn't made any checkpoints, and we copy the fsimage_0
@@ -85,30 +95,29 @@ public class TestBootstrapStandby {
@Test
public void testSuccessfulBaseCase() throws Exception {
removeStandbyNameDirs();
-
- try {
- cluster.restartNameNode(1);
- fail("Did not throw");
- } catch (IOException ioe) {
- GenericTestUtils.assertExceptionContains(
- "storage directory does not exist or is not accessible",
- ioe);
+
+ // skip the first NN, its up
+ for (int index = 1; index < maxNNCount; index++) {
+ try {
+ cluster.restartNameNode(index);
+ fail("Did not throw");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "storage directory does not exist or is not accessible", ioe);
+ }
+
+ int rc = BootstrapStandby.run(new String[] { "-nonInteractive" },
+ cluster.getConfiguration(index));
+ assertEquals(0, rc);
+
+ // Should have copied over the namespace from the active
+ FSImageTestUtil.assertNNHasCheckpoints(cluster, index, ImmutableList.of(0));
}
-
- int rc = BootstrapStandby.run(
- new String[]{"-nonInteractive"},
- cluster.getConfiguration(1));
- assertEquals(0, rc);
-
- // Should have copied over the namespace from the active
- FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
- ImmutableList.of(0));
- FSImageTestUtil.assertNNFilesMatch(cluster);
- // We should now be able to start the standby successfully.
- cluster.restartNameNode(1);
+ // We should now be able to start the standbys successfully.
+ restartNameNodesFromIndex(1);
}
-
+
/**
* Test for downloading a checkpoint made at a later checkpoint
* from the active.
@@ -123,21 +132,21 @@ public class TestBootstrapStandby {
NameNodeAdapter.saveNamespace(nn0);
NameNodeAdapter.leaveSafeMode(nn0);
long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
- .getFSImage().getMostRecentCheckpointTxId();
+ .getFSImage().getMostRecentCheckpointTxId();
assertEquals(6, expectedCheckpointTxId);
- int rc = BootstrapStandby.run(
- new String[]{"-force"},
- cluster.getConfiguration(1));
- assertEquals(0, rc);
-
- // Should have copied over the namespace from the active
- FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
- ImmutableList.of((int)expectedCheckpointTxId));
+ for (int i = 1; i < maxNNCount; i++) {
+ assertEquals(0, forceBootstrap(i));
+
+ // Should have copied over the namespace from the active
+ LOG.info("Checking namenode: " + i);
+ FSImageTestUtil.assertNNHasCheckpoints(cluster, i,
+ ImmutableList.of((int) expectedCheckpointTxId));
+ }
FSImageTestUtil.assertNNFilesMatch(cluster);
// We should now be able to start the standby successfully.
- cluster.restartNameNode(1);
+ restartNameNodesFromIndex(1);
}
/**
@@ -147,36 +156,40 @@ public class TestBootstrapStandby {
@Test
public void testSharedEditsMissingLogs() throws Exception {
removeStandbyNameDirs();
-
+
CheckpointSignature sig = nn0.getRpcServer().rollEditLog();
assertEquals(3, sig.getCurSegmentTxId());
-
+
// Should have created edits_1-2 in shared edits dir
- URI editsUri = cluster.getSharedEditsDir(0, 1);
+ URI editsUri = cluster.getSharedEditsDir(0, maxNNCount - 1);
File editsDir = new File(editsUri);
- File editsSegment = new File(new File(editsDir, "current"),
+ File currentDir = new File(editsDir, "current");
+ File editsSegment = new File(currentDir,
NNStorage.getFinalizedEditsFileName(1, 2));
GenericTestUtils.assertExists(editsSegment);
+ GenericTestUtils.assertExists(currentDir);
// Delete the segment.
assertTrue(editsSegment.delete());
-
+
// Trying to bootstrap standby should now fail since the edit
// logs aren't available in the shared dir.
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(BootstrapStandby.class));
try {
- int rc = BootstrapStandby.run(
- new String[]{"-force"},
- cluster.getConfiguration(1));
- assertEquals(BootstrapStandby.ERR_CODE_LOGS_UNAVAILABLE, rc);
+ assertEquals(BootstrapStandby.ERR_CODE_LOGS_UNAVAILABLE, forceBootstrap(1));
} finally {
logs.stopCapturing();
}
GenericTestUtils.assertMatches(logs.getOutput(),
"FATAL.*Unable to read transaction ids 1-3 from the configured shared");
}
-
+
+ /**
+ * Show that bootstrapping will fail on a given NameNode if its directories already exist. Its not
+ * run across all the NN because its testing the state local on each node.
+ * @throws Exception on unexpected failure
+ */
@Test
public void testStandbyDirsAlreadyExist() throws Exception {
// Should not pass since standby dirs exist, force not given
@@ -186,12 +199,9 @@ public class TestBootstrapStandby {
assertEquals(BootstrapStandby.ERR_CODE_ALREADY_FORMATTED, rc);
// Should pass with -force
- rc = BootstrapStandby.run(
- new String[]{"-force"},
- cluster.getConfiguration(1));
- assertEquals(0, rc);
+ assertEquals(0, forceBootstrap(1));
}
-
+
/**
* Test that, even if the other node is not active, we are able
* to bootstrap standby from it.
@@ -199,18 +209,44 @@ public class TestBootstrapStandby {
@Test(timeout=30000)
public void testOtherNodeNotActive() throws Exception {
cluster.transitionToStandby(0);
- int rc = BootstrapStandby.run(
- new String[]{"-force"},
- cluster.getConfiguration(1));
- assertEquals(0, rc);
+ assertSuccessfulBootstrapFromIndex(1);
}
private void removeStandbyNameDirs() {
- for (URI u : cluster.getNameDirs(1)) {
- assertTrue(u.getScheme().equals("file"));
- File dir = new File(u.getPath());
- LOG.info("Removing standby dir " + dir);
- assertTrue(FileUtil.fullyDelete(dir));
+ for (int i = 1; i < maxNNCount; i++) {
+ for (URI u : cluster.getNameDirs(i)) {
+ assertTrue(u.getScheme().equals("file"));
+ File dir = new File(u.getPath());
+ LOG.info("Removing standby dir " + dir);
+ assertTrue(FileUtil.fullyDelete(dir));
+ }
+ }
+ }
+
+ private void restartNameNodesFromIndex(int start) throws IOException {
+ for (int i = start; i < maxNNCount; i++) {
+ // We should now be able to start the standby successfully.
+ cluster.restartNameNode(i, false);
+ }
+
+ cluster.waitClusterUp();
+ cluster.waitActive();
+ }
+
+ /**
+ * Force boot strapping on a namenode
+ * @param i index of the namenode to attempt
+ * @return exit code
+ * @throws Exception on unexpected failure
+ */
+ private int forceBootstrap(int i) throws Exception {
+ return BootstrapStandby.run(new String[] { "-force" },
+ cluster.getConfiguration(i));
+ }
+
+ private void assertSuccessfulBootstrapFromIndex(int start) throws Exception {
+ for (int i = start; i < maxNNCount; i++) {
+ assertEquals(0, forceBootstrap(i));
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
index ca8f563..db9a2de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
@@ -52,7 +52,8 @@ public class TestBootstrapStandbyWithQJM {
private MiniDFSCluster cluster;
private MiniJournalCluster jCluster;
-
+ private int nnCount = 3;
+
@Before
public void setup() throws Exception {
Configuration conf = new Configuration();
@@ -62,7 +63,8 @@ public class TestBootstrapStandbyWithQJM {
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
- MiniQJMHACluster miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build();
+ MiniQJMHACluster miniQjmHaCluster =
+ new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
cluster = miniQjmHaCluster.getDfsCluster();
jCluster = miniQjmHaCluster.getJournalCluster();
@@ -90,18 +92,7 @@ public class TestBootstrapStandbyWithQJM {
public void testBootstrapStandbyWithStandbyNN() throws Exception {
// make the first NN in standby state
cluster.transitionToStandby(0);
- Configuration confNN1 = cluster.getConfiguration(1);
-
- // shut down nn1
- cluster.shutdownNameNode(1);
-
- int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
- assertEquals(0, rc);
-
- // Should have copied over the namespace from the standby
- FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
- ImmutableList.of(0));
- FSImageTestUtil.assertNNFilesMatch(cluster);
+ bootstrapStandbys();
}
/** BootstrapStandby when the existing NN is active */
@@ -109,17 +100,23 @@ public class TestBootstrapStandbyWithQJM {
public void testBootstrapStandbyWithActiveNN() throws Exception {
// make the first NN in active state
cluster.transitionToActive(0);
- Configuration confNN1 = cluster.getConfiguration(1);
-
- // shut down nn1
- cluster.shutdownNameNode(1);
-
- int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
- assertEquals(0, rc);
-
- // Should have copied over the namespace from the standby
- FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
- ImmutableList.of(0));
+ bootstrapStandbys();
+ }
+
+ private void bootstrapStandbys() throws Exception {
+ // shutdown and bootstrap all the other nns, except the first (start 1, not 0)
+ for (int i = 1; i < nnCount; i++) {
+ Configuration otherNNConf = cluster.getConfiguration(i);
+
+ // shut down other nn
+ cluster.shutdownNameNode(i);
+
+ int rc = BootstrapStandby.run(new String[] { "-force" }, otherNNConf);
+ assertEquals(0, rc);
+
+ // Should have copied over the namespace from the standby
+ FSImageTestUtil.assertNNHasCheckpoints(cluster, i, ImmutableList.of(0));
+ }
FSImageTestUtil.assertNNFilesMatch(cluster);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
index e7cba75..9164582 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
@@ -107,6 +107,7 @@ public class TestDNFencingWithReplication {
@Test
public void testFencingStress() throws Exception {
HAStressTestHarness harness = new HAStressTestHarness();
+ harness.setNumberOfNameNodes(3);
harness.conf.setInt(
DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
harness.conf.setInt(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
index 8c61c92..aea4f87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
@@ -113,7 +113,12 @@ public class TestEditLogTailer {
public void testNN1TriggersLogRolls() throws Exception {
testStandbyTriggersLogRolls(1);
}
-
+
+ @Test
+ public void testNN2TriggersLogRolls() throws Exception {
+ testStandbyTriggersLogRolls(2);
+ }
+
private static void testStandbyTriggersLogRolls(int activeIndex)
throws Exception {
Configuration conf = new Configuration();
@@ -125,7 +130,8 @@ public class TestEditLogTailer {
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10031))
- .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10032)));
+ .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10032))
+ .addNN(new MiniDFSNNTopology.NNConf("nn3").setIpcPort(10033)));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
@@ -145,7 +151,7 @@ public class TestEditLogTailer {
private static void waitForLogRollInSharedDir(MiniDFSCluster cluster,
long startTxId) throws Exception {
- URI sharedUri = cluster.getSharedEditsDir(0, 1);
+ URI sharedUri = cluster.getSharedEditsDir(0, 2);
File sharedDir = new File(sharedUri.getPath(), "current");
final File expectedLog = new File(sharedDir,
NNStorage.getInProgressEditsFileName(startTxId));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
index 151e7d3..116079a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
@@ -56,10 +56,11 @@ public class TestFailoverWithBlockTokensEnabled {
private static final Path TEST_PATH = new Path("/test-path");
private static final String TEST_DATA = "very important text";
-
+ private static final int numNNs = 3;
+
private Configuration conf;
private MiniDFSCluster cluster;
-
+
@Before
public void startCluster() throws IOException {
conf = new Configuration();
@@ -67,7 +68,7 @@ public class TestFailoverWithBlockTokensEnabled {
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .nnTopology(MiniDFSNNTopology.simpleHATopology(numNNs))
.numDataNodes(1)
.build();
}
@@ -78,33 +79,41 @@ public class TestFailoverWithBlockTokensEnabled {
cluster.shutdown();
}
}
-
+
@Test
public void ensureSerialNumbersNeverOverlap() {
BlockTokenSecretManager btsm1 = cluster.getNamesystem(0).getBlockManager()
.getBlockTokenSecretManager();
BlockTokenSecretManager btsm2 = cluster.getNamesystem(1).getBlockManager()
.getBlockTokenSecretManager();
-
- btsm1.setSerialNo(0);
- btsm2.setSerialNo(0);
- assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
-
- btsm1.setSerialNo(Integer.MAX_VALUE);
- btsm2.setSerialNo(Integer.MAX_VALUE);
- assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
-
- btsm1.setSerialNo(Integer.MIN_VALUE);
- btsm2.setSerialNo(Integer.MIN_VALUE);
- assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
-
- btsm1.setSerialNo(Integer.MAX_VALUE / 2);
- btsm2.setSerialNo(Integer.MAX_VALUE / 2);
- assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+ BlockTokenSecretManager btsm3 = cluster.getNamesystem(2).getBlockManager()
+ .getBlockTokenSecretManager();
+
+ setAndCheckSerialNumber(0, btsm1, btsm2, btsm3);
+ setAndCheckSerialNumber(Integer.MAX_VALUE, btsm1, btsm2, btsm3);
+ setAndCheckSerialNumber(Integer.MIN_VALUE, btsm1, btsm2, btsm3);
+ setAndCheckSerialNumber(Integer.MAX_VALUE / 2, btsm1, btsm2, btsm3);
+ setAndCheckSerialNumber(Integer.MIN_VALUE / 2, btsm1, btsm2, btsm3);
+ setAndCheckSerialNumber(Integer.MAX_VALUE / 3, btsm1, btsm2, btsm3);
+ setAndCheckSerialNumber(Integer.MIN_VALUE / 3, btsm1, btsm2, btsm3);
+ }
+
+ private void setAndCheckSerialNumber(int serialNumber, BlockTokenSecretManager... btsms) {
+ for (BlockTokenSecretManager btsm : btsms) {
+ btsm.setSerialNo(serialNumber);
+ }
- btsm1.setSerialNo(Integer.MIN_VALUE / 2);
- btsm2.setSerialNo(Integer.MIN_VALUE / 2);
- assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+ for (int i = 0; i < btsms.length; i++) {
+ for (int j = 0; j < btsms.length; j++) {
+ if (j == i) {
+ continue;
+ }
+ int first = btsms[i].getSerialNoForTesting();
+ int second = btsms[j].getSerialNoForTesting();
+ assertFalse("Overlap found for set serial number (" + serialNumber + ") is " + i + ": "
+ + first + " == " + j + ": " + second, first == second);
+ }
+ }
}
@Test