You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/29 20:16:38 UTC
svn commit: r1097905 [9/14] - in /hadoop/hdfs/trunk: ./ bin/
src/c++/libhdfs/ src/contrib/hdfsproxy/
src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/ja...
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Fri Apr 29 18:16:32 2011
@@ -50,7 +50,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -238,7 +238,7 @@ public class DFSTestUtil {
public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
Path file, int blockNo) throws IOException {
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
- cluster.getNameNodePort()), cluster.getConfiguration());
+ cluster.getNameNodePort()), cluster.getConfiguration(0));
LocatedBlocks blocks;
try {
blocks = client.getNamenode().getBlockLocations(
@@ -254,7 +254,7 @@ public class DFSTestUtil {
* the requested number of racks, with the requested number of
* replicas, and the requested number of replicas still needed.
*/
- public static void waitForReplication(MiniDFSCluster cluster, Block b,
+ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
int racks, int replicas, int neededReplicas)
throws IOException, TimeoutException, InterruptedException {
int curRacks = 0;
@@ -265,7 +265,7 @@ public class DFSTestUtil {
do {
Thread.sleep(1000);
- int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b);
+ int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b.getLocalBlock());
curRacks = r[0];
curReplicas = r[1];
curNeededReplicas = r[2];
@@ -288,11 +288,11 @@ public class DFSTestUtil {
* given block in the file contains the given number of corrupt replicas.
*/
public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
- Path file, Block b, int corruptRepls)
+ Path file, ExtendedBlock b, int corruptRepls)
throws IOException, TimeoutException {
int count = 0;
final int ATTEMPTS = 20;
- int repls = ns.numCorruptReplicas(b);
+ int repls = ns.numCorruptReplicas(b.getLocalBlock());
while (repls != corruptRepls && count < ATTEMPTS) {
try {
IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
@@ -301,7 +301,7 @@ public class DFSTestUtil {
// Swallow exceptions
}
System.out.println("Waiting for "+corruptRepls+" corrupt replicas");
- repls = ns.numCorruptReplicas(b);
+ repls = ns.numCorruptReplicas(b.getLocalBlock());
count++;
}
if (count == ATTEMPTS) {
@@ -342,11 +342,11 @@ public class DFSTestUtil {
* Returns the index of the first datanode which has a copy
* of the given block, or -1 if no such datanode exists.
*/
- public static int firstDnWithBlock(MiniDFSCluster cluster, Block b)
+ public static int firstDnWithBlock(MiniDFSCluster cluster, ExtendedBlock b)
throws IOException {
int numDatanodes = cluster.getDataNodes().size();
for (int i = 0; i < numDatanodes; i++) {
- String blockContent = cluster.readBlockOnDataNode(i, b.getBlockName());
+ String blockContent = cluster.readBlockOnDataNode(i, b);
if (blockContent != null) {
return i;
}
@@ -405,7 +405,7 @@ public class DFSTestUtil {
files = null;
}
- public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
+ public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
DFSDataInputStream in =
(DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
in.readByte();
@@ -553,7 +553,7 @@ public class DFSTestUtil {
}
/** For {@link TestTransferRbw} */
- public static DataTransferProtocol.Status transferRbw(final Block b,
+ public static DataTransferProtocol.Status transferRbw(final ExtendedBlock b,
final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
assertEquals(2, datanodes.length);
final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Apr 29 18:16:32 2011
@@ -36,8 +36,6 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
@@ -46,15 +44,20 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -65,6 +68,7 @@ import org.apache.hadoop.security.Refres
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@@ -73,16 +77,18 @@ import org.apache.hadoop.util.ToolRunner
* The data directories for non-simulated DFS are under the testing directory.
* For simulated data nodes, no underlying fs storage is used.
*/
-@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "Hive", "MapReduce", "Pig"})
-@InterfaceStability.Unstable
public class MiniDFSCluster {
+ private static final String NAMESERVICE_ID_PREFIX = "nameserviceId";
+ private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
+
/**
* Class to construct instances of MiniDFSClusters with specific options.
*/
public static class Builder {
private int nameNodePort = 0;
private final Configuration conf;
+ private int numNameNodes = 1;
private int numDataNodes = 1;
private boolean format = true;
private boolean manageNameDfsDirs = true;
@@ -91,15 +97,25 @@ public class MiniDFSCluster {
private String[] racks = null;
private String [] hosts = null;
private long [] simulatedCapacities = null;
- // wait until namenode has left safe mode?
+ private String clusterId = null;
private boolean waitSafeMode = true;
private boolean setupHostsFile = false;
+ private boolean federation = false;
public Builder(Configuration conf) {
this.conf = conf;
}
/**
+ * default false - non federated cluster
+ * @param val
+ * @return Builder object
+ */
+ public Builder federation (boolean val){
+ this.federation = val;
+ return this;
+ }
+ /**
* Default: 0
*/
public Builder nameNodePort(int val) {
@@ -110,6 +126,14 @@ public class MiniDFSCluster {
/**
* Default: 1
*/
+ public Builder numNameNodes(int val) {
+ this.numNameNodes = val;
+ return this;
+ }
+
+ /**
+ * Default: 1
+ */
public Builder numDataNodes(int val) {
this.numDataNodes = val;
return this;
@@ -180,6 +204,14 @@ public class MiniDFSCluster {
}
/**
+ * Default: null
+ */
+ public Builder clusterId(String cid) {
+ this.clusterId = cid;
+ return this;
+ }
+
+ /**
* Default: false
* When true the hosts file/include file for the cluster is setup
*/
@@ -200,6 +232,12 @@ public class MiniDFSCluster {
* Used by builder to create and return an instance of MiniDFSCluster
*/
private MiniDFSCluster(Builder builder) throws IOException {
+ LOG.info("starting cluster with " + builder.numNameNodes + " namenodes.");
+ nameNodes = new NameNodeInfo[builder.numNameNodes];
+ // try to determine if in federation mode
+ if(builder.numNameNodes > 1)
+ builder.federation = true;
+
initMiniDFSCluster(builder.nameNodePort,
builder.conf,
builder.numDataNodes,
@@ -210,8 +248,10 @@ public class MiniDFSCluster {
builder.racks,
builder.hosts,
builder.simulatedCapacities,
+ builder.clusterId,
builder.waitSafeMode,
- builder.setupHostsFile);
+ builder.setupHostsFile,
+ builder.federation);
}
public class DataNodeProperties {
@@ -225,27 +265,35 @@ public class MiniDFSCluster {
this.dnArgs = args;
}
}
- private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
- private URI myUri = null;
private Configuration conf;
- private NameNode nameNode;
+ private NameNodeInfo[] nameNodes;
private int numDataNodes;
private ArrayList<DataNodeProperties> dataNodes =
new ArrayList<DataNodeProperties>();
private File base_dir;
private File data_dir;
-
+ private boolean federation = false;
private boolean waitSafeMode = true;
- public final static String FINALIZED_DIR_NAME = "/current/finalized/";
-
+ /**
+ * Stores the information related to a namenode in the cluster
+ */
+ static class NameNodeInfo {
+ final NameNode nameNode;
+ final Configuration conf;
+ NameNodeInfo(NameNode nn, Configuration conf) {
+ this.nameNode = nn;
+ this.conf = conf;
+ }
+ }
/**
* This null constructor is used only when wishing to start a data node cluster
* without a name node (ie when the name node is started elsewhere).
*/
public MiniDFSCluster() {
+ nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
}
/**
@@ -406,21 +454,22 @@ public class MiniDFSCluster {
StartupOption operation,
String[] racks, String hosts[],
long[] simulatedCapacities) throws IOException {
- initMiniDFSCluster(nameNodePort, conf, numDataNodes, format,
+ this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
+ initMiniDFSCluster(nameNodePort, conf, 1, format,
manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
- simulatedCapacities, true, false);
+ simulatedCapacities, null, true, false, false);
}
private void initMiniDFSCluster(int nameNodePort, Configuration conf,
int numDataNodes, boolean format, boolean manageNameDfsDirs,
boolean manageDataDfsDirs, StartupOption operation, String[] racks,
- String[] hosts, long[] simulatedCapacities, boolean waitSafeMode,
- boolean setupHostsFile)
- throws IOException {
+ String[] hosts, long[] simulatedCapacities, String clusterId,
+ boolean waitSafeMode, boolean setupHostsFile, boolean federation)
+ throws IOException {
this.conf = conf;
base_dir = new File(getBaseDirectory());
data_dir = new File(base_dir, "data");
-
+ this.federation = federation;
this.waitSafeMode = waitSafeMode;
// use alternate RPC engine if spec'd
@@ -444,73 +493,179 @@ public class MiniDFSCluster {
conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
false);
}
-
- // Setup the NameNode configuration
- FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort));
- conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
- if (manageNameDfsDirs) {
- conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
- fileAsURI(new File(base_dir, "name1"))+","+
- fileAsURI(new File(base_dir, "name2")));
- conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
- fileAsURI(new File(base_dir, "namesecondary1"))+","+
- fileAsURI(new File(base_dir, "namesecondary2")));
- }
int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, Math.min(replication, numDataNodes));
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 3); // 3 second
+ conf.setClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ StaticMapping.class, DNSToSwitchMapping.class);
+
+ Collection<String> nameserviceIds = DFSUtil.getNameServiceIds(conf);
+ if(nameserviceIds.size() > 1)
+ federation = true;
+
+ if (!federation) {
+ conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "127.0.0.1:" + nameNodePort);
+ conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
+ NameNode nn = createNameNode(0, conf, numDataNodes, manageNameDfsDirs,
+ format, operation, clusterId);
+ nameNodes[0] = new NameNodeInfo(nn, conf);
+ FileSystem.setDefaultUri(conf, getURI(0));
+ } else {
+ if (nameserviceIds.isEmpty()) {
+ for (int i = 0; i < nameNodes.length; i++) {
+ nameserviceIds.add(NAMESERVICE_ID_PREFIX + i);
+ }
+ }
+ initFederationConf(conf, nameserviceIds, numDataNodes, nameNodePort);
+ createFederationNamenodes(conf, nameserviceIds, manageNameDfsDirs, format,
+ operation, clusterId);
+ }
- // Format and clean out DataNode directories
if (format) {
if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
throw new IOException("Cannot remove data directory: " + data_dir);
}
- NameNode.format(conf);
}
- // Start the NameNode
- String[] args = (operation == null ||
- operation == StartupOption.FORMAT ||
- operation == StartupOption.REGULAR) ?
- new String[] {} : new String[] {operation.getName()};
- conf.setClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
- StaticMapping.class, DNSToSwitchMapping.class);
- nameNode = NameNode.createNameNode(args, conf);
-
// Start the DataNodes
startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
hosts, simulatedCapacities, setupHostsFile);
waitClusterUp();
-
//make sure ProxyUsers uses the latest conf
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
+ /** Initialize configuration for federated cluster */
+ private static void initFederationConf(Configuration conf,
+ Collection<String> nameserviceIds, int numDataNodes, int nnPort) {
+ String nameserviceIdList = "";
+ for (String nameserviceId : nameserviceIds) {
+ // Create comma separated list of nameserviceIds
+ if (nameserviceIdList.length() > 0) {
+ nameserviceIdList += ",";
+ }
+ nameserviceIdList += nameserviceId;
+ initFederatedNamenodeAddress(conf, nameserviceId, nnPort);
+ nnPort = nnPort == 0 ? 0 : nnPort + 2;
+ }
+ conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nameserviceIdList);
+ }
+
+ /* For federated namenode initialize the address:port */
+ private static void initFederatedNamenodeAddress(Configuration conf,
+ String nameserviceId, int nnPort) {
+ // Set nameserviceId specific key
+ String key = DFSUtil.getNameServiceIdKey(
+ DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId);
+ conf.set(key, "127.0.0.1:0");
+
+ key = DFSUtil.getNameServiceIdKey(
+ DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId);
+ conf.set(key, "127.0.0.1:" + nnPort);
+ }
+
+ private void createFederationNamenodes(Configuration conf,
+ Collection<String> nameserviceIds, boolean manageNameDfsDirs,
+ boolean format, StartupOption operation, String clusterId)
+ throws IOException {
+ // Create namenodes in the cluster
+ int nnCounter = 0;
+ for (String nameserviceId : nameserviceIds) {
+ createFederatedNameNode(nnCounter++, conf, numDataNodes, manageNameDfsDirs,
+ format, operation, clusterId, nameserviceId);
+ }
+ }
+
+ private NameNode createNameNode(int nnIndex, Configuration conf,
+ int numDataNodes, boolean manageNameDfsDirs, boolean format,
+ StartupOption operation, String clusterId)
+ throws IOException {
+ if (manageNameDfsDirs) {
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
+ fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
+ conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+ fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
+ fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
+ }
+
+ // Format and clean out DataNode directories
+ if (format) {
+ GenericTestUtils.formatNamenode(conf);
+ }
+ if (operation == StartupOption.UPGRADE){
+ operation.setClusterId(clusterId);
+ }
+
+ // Start the NameNode
+ String[] args = (operation == null ||
+ operation == StartupOption.FORMAT ||
+ operation == StartupOption.REGULAR) ?
+ new String[] {} : new String[] {operation.getName()};
+ return NameNode.createNameNode(args, conf);
+ }
+
+ private void createFederatedNameNode(int nnIndex, Configuration conf,
+ int numDataNodes, boolean manageNameDfsDirs, boolean format,
+ StartupOption operation, String clusterId, String nameserviceId)
+ throws IOException {
+ conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID, nameserviceId);
+ NameNode nn = createNameNode(nnIndex, conf, numDataNodes, manageNameDfsDirs,
+ format, operation, clusterId);
+ conf.set(DFSUtil.getNameServiceIdKey(
+ DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId), NameNode
+ .getHostPortString(nn.getNameNodeAddress()));
+ conf.set(DFSUtil.getNameServiceIdKey(
+ DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId), NameNode
+ .getHostPortString(nn.getHttpAddress()));
+ DFSUtil.setGenericConf(conf, nameserviceId,
+ DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+ nameNodes[nnIndex] = new NameNodeInfo(nn, new Configuration(conf));
+ }
+
private void setRpcEngine(Configuration conf, Class<?> protocol, Class<?> engine) {
conf.setClass("rpc.engine."+protocol.getName(), engine, Object.class);
}
/**
- *
- * @return URI of this MiniDFSCluster
+ * @return URI of the given namenode in MiniDFSCluster
*/
- public URI getURI() {
- return myUri;
+ public URI getURI(int nnIndex) {
+ InetSocketAddress addr = nameNodes[nnIndex].nameNode.getNameNodeAddress();
+ String hostPort = NameNode.getHostPortString(addr);
+ URI uri = null;
+ try {
+ uri = new URI("hdfs://" + hostPort);
+ } catch (URISyntaxException e) {
+ NameNode.LOG.warn("unexpected URISyntaxException: " + e );
+ }
+ return uri;
}
/**
- * Get configuration.
- * @return Configuration of this MiniDFSCluster
+ * @return Configuration of for the given namenode
*/
- public Configuration getConfiguration() {
- return conf;
+ public Configuration getConfiguration(int nnIndex) {
+ return nameNodes[nnIndex].conf;
}
/**
- * wait for the cluster to get out of
- * safemode.
+ * wait for the given namenode to get out of safemode.
+ */
+ public void waitNameNodeUp(int nnIndex) {
+ while (!isNameNodeUp(nnIndex)) {
+ try {
+ LOG.warn("Waiting for namenode at " + nnIndex + " to start...");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ /**
+ * wait for the cluster to get out of safemode.
*/
public void waitClusterUp() {
if (numDataNodes > 0) {
@@ -593,18 +748,6 @@ public class MiniDFSCluster {
// If minicluster's name node is null assume that the conf has been
// set with the right address:port of the name node.
//
- if (nameNode != null) { // set conf from the name node
- InetSocketAddress nnAddr = nameNode.getNameNodeAddress();
- int nameNodePort = nnAddr.getPort();
- try {
- myUri = new URI("hdfs://"+ nnAddr.getHostName() + ":" +
- Integer.toString(nameNodePort));
- } catch (URISyntaxException e) {
- throw new IOException("Couldn't parse own URI", e);
- }
- FileSystem.setDefaultUri(conf, myUri);
- }
-
if (racks != null && numDataNodes > racks.length ) {
throw new IllegalArgumentException( "The length of racks [" + racks.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
@@ -615,7 +758,6 @@ public class MiniDFSCluster {
}
//Generate some hostnames if required
if (racks != null && hosts == null) {
- System.out.println("Generating host names for datanodes");
hosts = new String[numDataNodes];
for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
@@ -639,8 +781,8 @@ public class MiniDFSCluster {
// Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile);
if (manageDfsDirs) {
- File dir1 = new File(data_dir, "data"+(2*i+1));
- File dir2 = new File(data_dir, "data"+(2*i+2));
+ File dir1 = getStorageDir(i, 0);
+ File dir2 = getStorageDir(i, 1);
dir1.mkdirs();
dir2.mkdirs();
if (!dir1.isDirectory() || !dir2.isDirectory()) {
@@ -652,7 +794,7 @@ public class MiniDFSCluster {
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
}
if (simulatedCapacities != null) {
- dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
+ dnConf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
simulatedCapacities[i-curDatanodesNum]);
}
@@ -689,7 +831,7 @@ public class MiniDFSCluster {
StaticMapping.addNodeToRack(ipAddr + ":" + port,
racks[i-curDatanodesNum]);
}
- DataNode.runDatanodeDaemon(dn);
+ dn.runDatanodeDaemon();
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
}
curDatanodesNum += numDataNodes;
@@ -753,6 +895,30 @@ public class MiniDFSCluster {
simulatedCapacities, false);
}
+
+ /**
+ * Finalize the namenode. Block pools corresponding to the namenode are
+ * finalized on the datanode.
+ */
+ private void finalizeNamenode(NameNode nn, Configuration conf) throws Exception {
+ if (nn == null) {
+ throw new IllegalStateException("Attempting to finalize "
+ + "Namenode but it is not running");
+ }
+ ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
+ }
+
+ /**
+ * Finalize cluster for the namenode at the given index
+ * @see MiniDFSCluster#finalizeCluster(Configuration)
+ * @param nnIndex
+ * @param conf
+ * @throws Exception
+ */
+ public void finalizeCluster(int nnIndex, Configuration conf) throws Exception {
+ finalizeNamenode(nameNodes[nnIndex].nameNode, nameNodes[nnIndex].conf);
+ }
+
/**
* If the NameNode is running, attempt to finalize a previous upgrade.
* When this method return, the NameNode should be finalized, but
@@ -761,18 +927,32 @@ public class MiniDFSCluster {
* @throws IllegalStateException if the Namenode is not running.
*/
public void finalizeCluster(Configuration conf) throws Exception {
- if (nameNode == null) {
- throw new IllegalStateException("Attempting to finalize "
- + "Namenode but it is not running");
+ for (NameNodeInfo nnInfo : nameNodes) {
+ if (nnInfo == null) {
+ throw new IllegalStateException("Attempting to finalize "
+ + "Namenode but it is not running");
+ }
+ finalizeNamenode(nnInfo.nameNode, nnInfo.conf);
}
- ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
+ }
+
+ public int getNumNameNodes() {
+ return nameNodes.length;
}
/**
* Gets the started NameNode. May be null.
*/
public NameNode getNameNode() {
- return nameNode;
+ checkSingleNameNode();
+ return getNameNode(0);
+ }
+
+ /**
+ * Gets the NameNode for the index. May be null.
+ */
+ public NameNode getNameNode(int nnIndex) {
+ return nameNodes[nnIndex].nameNode;
}
/**
@@ -780,7 +960,12 @@ public class MiniDFSCluster {
* @return {@link FSNamesystem} object.
*/
public FSNamesystem getNamesystem() {
- return NameNodeAdapter.getNamesystem(nameNode);
+ checkSingleNameNode();
+ return NameNodeAdapter.getNamesystem(nameNodes[0].nameNode);
+ }
+
+ public FSNamesystem getNamesystem(int nnIndex) {
+ return NameNodeAdapter.getNamesystem(nameNodes[nnIndex].nameNode);
}
/**
@@ -808,21 +993,34 @@ public class MiniDFSCluster {
/**
* Gets the rpc port used by the NameNode, because the caller
* supplied port is not necessarily the actual port used.
+ * Assumption: cluster has a single namenode
*/
public int getNameNodePort() {
- return nameNode.getNameNodeAddress().getPort();
+ checkSingleNameNode();
+ return getNameNodePort(0);
}
/**
- * Shut down the servers that are up.
+ * Gets the rpc port used by the NameNode at the given index, because the
+ * caller supplied port is not necessarily the actual port used.
+ */
+ public int getNameNodePort(int nnIndex) {
+ return nameNodes[nnIndex].nameNode.getNameNodeAddress().getPort();
+ }
+
+ /**
+ * Shutdown all the nodes in the cluster.
*/
public void shutdown() {
System.out.println("Shutting down the Mini HDFS Cluster");
shutdownDataNodes();
- if (nameNode != null) {
- nameNode.stop();
- nameNode.join();
- nameNode = null;
+ for (NameNodeInfo nnInfo : nameNodes) {
+ NameNode nameNode = nnInfo.nameNode;
+ if (nameNode != null) {
+ nameNode.stop();
+ nameNode.join();
+ nameNode = null;
+ }
}
}
@@ -840,23 +1038,36 @@ public class MiniDFSCluster {
}
/**
- * Shutdown namenode.
+ * Shutdown all the namenodes.
*/
- public synchronized void shutdownNameNode() {
- if (nameNode != null) {
+ public synchronized void shutdownNameNodes() {
+ for (int i = 0; i < nameNodes.length; i++) {
+ shutdownNameNode(i);
+ }
+ }
+
+ /**
+ * Shutdown the namenode at a given index.
+ */
+ public synchronized void shutdownNameNode(int nnIndex) {
+ NameNode nn = nameNodes[nnIndex].nameNode;
+ if (nn != null) {
System.out.println("Shutting down the namenode");
- nameNode.stop();
- nameNode.join();
- nameNode = null;
+ nn.stop();
+ nn.join();
+ Configuration conf = nameNodes[nnIndex].conf;
+ nameNodes[nnIndex] = new NameNodeInfo(null, conf);
}
}
/**
- * Restart namenode.
+ * Restart namenode at a given index.
*/
- public synchronized void restartNameNode() throws IOException {
- shutdownNameNode();
- nameNode = NameNode.createNameNode(new String[] {}, conf);
+ public synchronized void restartNameNode(int nnIndex) throws IOException {
+ Configuration conf = nameNodes[nnIndex].conf;
+ shutdownNameNode(nnIndex);
+ NameNode nn = NameNode.createNameNode(new String[] {}, conf);
+ nameNodes[nnIndex] = new NameNodeInfo(nn, conf);
waitClusterUp();
System.out.println("Restarted the namenode");
int failedCount = 0;
@@ -884,76 +1095,62 @@ public class MiniDFSCluster {
* @param The index of the datanode
* @param The name of the block
* @throws IOException on error accessing the file for the given block
- * @return The contents of the block file, null if none found
*/
- public String readBlockOnDataNode(int i, String blockName) throws IOException {
- assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
-
- // Each datanode has multiple data dirs, check each
- for (int dn = i*2; dn < i*2+2; dn++) {
- File dataDir = new File(getBaseDirectory() + "data");
- File blockFile = new File(dataDir,
- "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
- if (blockFile.exists()) {
- return DFSTestUtil.readFile(blockFile);
- }
- }
- return null;
- }
-
- /**
- * Corrupt a block on all datanodes.
- *
- * @param The name of the block
- * @throws IOException on error accessing the given block.
- * @return The number of block files corrupted.
- */
- public int corruptBlockOnDataNodes(String blockName) throws IOException {
+ int corruptBlockOnDataNodes(ExtendedBlock block) throws IOException{
int blocksCorrupted = 0;
- for (int i=0; i < dataNodes.size(); i++) {
- if (corruptReplica(blockName, i)) {
+ File[] blockFiles = getAllBlockFiles(block);
+ for (File f : blockFiles) {
+ if (corruptBlock(f)) {
blocksCorrupted++;
}
}
return blocksCorrupted;
}
+ public String readBlockOnDataNode(int i, ExtendedBlock block)
+ throws IOException {
+ assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
+ File blockFile = getBlockFile(i, block);
+ if (blockFile != null && blockFile.exists()) {
+ return DFSTestUtil.readFile(blockFile);
+ }
+ return null;
+ }
+
/**
* Corrupt a block on a particular datanode.
*
- * @param The index of the datanode
- * @param The name of the block
+ * @param i index of the datanode
+ * @param blk name of the block
* @throws IOException on error accessing the given block or if
* the contents of the block (on the same datanode) differ.
* @return true if a replica was corrupted, false otherwise
+ * Types: delete, write bad data, truncate
*/
- public boolean corruptReplica(String blockName, int i) throws IOException {
- Random random = new Random();
- assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
- int filesCorrupted = 0;
+ public static boolean corruptReplica(int i, ExtendedBlock blk)
+ throws IOException {
+ File blockFile = getBlockFile(i, blk);
+ return corruptBlock(blockFile);
+ }
- // Each datanode has multiple data dirs, check each
- for (int dn = i*2; dn < i*2+2; dn++) {
- File dataDir = new File(getBaseDirectory() + "data");
- File blockFile = new File(dataDir,
- "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
-
- // Corrupt the replica by writing some bytes into a random offset
- if (blockFile.exists()) {
- System.out.println("Corrupting " + blockFile);
- RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
- FileChannel channel = raFile.getChannel();
- String badString = "BADBAD";
- int rand = random.nextInt((int)channel.size()/2);
- raFile.seek(rand);
- raFile.write(badString.getBytes());
- raFile.close();
- filesCorrupted++;
- }
- }
- assert filesCorrupted == 0 || filesCorrupted == 1
- : "Unexpected # block files";
- return filesCorrupted == 1;
+ /*
+ * Corrupt a block on a particular datanode
+ */
+ public static boolean corruptBlock(File blockFile) throws IOException {
+ if (blockFile == null || !blockFile.exists()) {
+ return false;
+ }
+ // Corrupt replica by writing random bytes into replica
+ Random random = new Random();
+ RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+ FileChannel channel = raFile.getChannel();
+ String badString = "BADBAD";
+ int rand = random.nextInt((int)channel.size()/2);
+ raFile.seek(rand);
+ raFile.write(badString.getBytes());
+ raFile.close();
+ LOG.warn("Corrupting the block " + blockFile);
+ return true;
}
/*
@@ -966,7 +1163,7 @@ public class MiniDFSCluster {
DataNodeProperties dnprop = dataNodes.remove(i);
DataNode dn = dnprop.datanode;
System.out.println("MiniDFSCluster Stopping DataNode " +
- dn.dnRegistration.getName() +
+ dn.getMachineName() +
" from a total of " + (dataNodes.size() + 1) +
" datanodes.");
dn.shutdown();
@@ -981,7 +1178,12 @@ public class MiniDFSCluster {
int i;
for (i = 0; i < dataNodes.size(); i++) {
DataNode dn = dataNodes.get(i).datanode;
- if (dn.dnRegistration.getName().equals(name)) {
+ // get BP registration
+ DatanodeRegistration dnR =
+ DataNodeTestUtils.getDNRegistrationByMachineName(dn, name);
+ LOG.info("for name=" + name + " found bp=" + dnR +
+ "; with dnMn=" + dn.getMachineName());
+ if(dnR != null) {
break;
}
}
@@ -1065,7 +1267,8 @@ public class MiniDFSCluster {
* Returns true if the NameNode is running and is out of Safe Mode
* or if waiting for safe mode is disabled.
*/
- public boolean isClusterUp() {
+ public boolean isNameNodeUp(int nnIndex) {
+ NameNode nameNode = nameNodes[nnIndex].nameNode;
if (nameNode == null) {
return false;
}
@@ -1076,6 +1279,18 @@ public class MiniDFSCluster {
}
return isUp;
}
+
+ /**
+ * Returns true if all the NameNodes are running and is out of Safe Mode.
+ */
+ public boolean isClusterUp() {
+ for (int index = 0; index < nameNodes.length; index++) {
+ if (!isNameNodeUp(index)) {
+ return false;
+ }
+ }
+ return true;
+ }
/**
* Returns true if there is at least one DataNode running.
@@ -1084,33 +1299,55 @@ public class MiniDFSCluster {
if (dataNodes == null || dataNodes.size() == 0) {
return false;
}
- return true;
+ for (DataNodeProperties dn : dataNodes) {
+ if (dn.datanode.isDatanodeUp()) {
+ return true;
+ }
+ }
+ return false;
}
/**
- * Get a client handle to the DFS cluster.
+ * Get a client handle to the DFS cluster with a single namenode.
*/
public FileSystem getFileSystem() throws IOException {
- return FileSystem.get(conf);
+ checkSingleNameNode();
+ return getFileSystem(0);
}
+ /**
+ * Get a client handle to the DFS cluster for the namenode at given index.
+ */
+ public FileSystem getFileSystem(int nnIndex) throws IOException {
+ return FileSystem.get(getURI(nnIndex), nameNodes[nnIndex].conf);
+ }
/**
* Get another FileSystem instance that is different from FileSystem.get(conf).
* This simulating different threads working on different FileSystem instances.
*/
- public FileSystem getNewFileSystemInstance() throws IOException {
- return FileSystem.newInstance(conf);
+ public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException {
+ return FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf);
+ }
+
+ /**
+ * @return a http URL
+ */
+ public String getHttpUri(int nnIndex) throws IOException {
+ return "http://"
+ + nameNodes[nnIndex].conf
+ .get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
}
/**
* @return a {@link HftpFileSystem} object.
*/
- public HftpFileSystem getHftpFileSystem() throws IOException {
- final String str = "hftp://"
- + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+ public HftpFileSystem getHftpFileSystem(int nnIndex) throws IOException {
+ String uri = "hftp://"
+ + nameNodes[nnIndex].conf
+ .get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
try {
- return (HftpFileSystem)FileSystem.get(new URI(str), conf);
+ return (HftpFileSystem)FileSystem.get(new URI(uri), conf);
} catch (URISyntaxException e) {
throw new IOException(e);
}
@@ -1120,14 +1357,14 @@ public class MiniDFSCluster {
* @return a {@link HftpFileSystem} object as specified user.
*/
public HftpFileSystem getHftpFileSystemAs(final String username,
- final Configuration conf, final String... groups
- ) throws IOException, InterruptedException {
+ final Configuration conf, final int nnIndex, final String... groups)
+ throws IOException, InterruptedException {
final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
username, groups);
return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() {
@Override
public HftpFileSystem run() throws Exception {
- return getHftpFileSystem();
+ return getHftpFileSystem(nnIndex);
}
});
}
@@ -1135,31 +1372,29 @@ public class MiniDFSCluster {
/**
* Get the directories where the namenode stores its image.
*/
- public Collection<URI> getNameDirs() {
- return FSNamesystem.getNamespaceDirs(conf);
+ public Collection<URI> getNameDirs(int nnIndex) {
+ return FSNamesystem.getNamespaceDirs(nameNodes[nnIndex].conf);
}
/**
* Get the directories where the namenode stores its edits.
*/
- public Collection<URI> getNameEditsDirs() {
- return FSNamesystem.getNamespaceEditsDirs(conf);
+ public Collection<URI> getNameEditsDirs(int nnIndex) {
+ return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
}
- /**
- * Wait until the cluster is active and running.
- */
- public void waitActive() throws IOException {
- if (nameNode == null) {
+ /** Wait until the given namenode gets registration from all the datanodes */
+ public void waitActive(int nnIndex) throws IOException {
+ if (nameNodes.length == 0 || nameNodes[nnIndex] == null) {
return;
}
- InetSocketAddress addr = new InetSocketAddress("localhost",
- getNameNodePort());
+ InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
DFSClient client = new DFSClient(addr, conf);
- // make sure all datanodes have registered and sent heartbeat
- while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) {
+ // ensure all datanodes have registered and sent heartbeat to the namenode
+ while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE), addr)) {
try {
+ LOG.info("Waiting for cluster to become active");
Thread.sleep(100);
} catch (InterruptedException e) {
}
@@ -1168,10 +1403,39 @@ public class MiniDFSCluster {
client.close();
}
- private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {
+ /**
+ * Wait until the cluster is active and running.
+ */
+ public void waitActive() throws IOException {
+ for (int index = 0; index < nameNodes.length; index++) {
+ waitActive(index);
+ }
+ }
+
+ private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,
+ InetSocketAddress addr) {
+ // If a datanode failed to start, then do not wait
+ for (DataNodeProperties dn : dataNodes) {
+ // the datanode thread communicating with the namenode should be alive
+ if (!dn.datanode.isBPServiceAlive(addr)) {
+ LOG.warn("BPOfferService failed to start in datanode " + dn.datanode
+ + " for namenode at " + addr);
+ return false;
+ }
+ }
+
+ // Wait for expected number of datanodes to start
if (dnInfo.length != numDataNodes) {
return true;
}
+
+ // if one of the data nodes is not fully started, continue to wait
+ for (DataNodeProperties dn : dataNodes) {
+ if (!dn.datanode.isDatanodeFullyStarted()) {
+ return true;
+ }
+ }
+
// make sure all datanodes have sent first heartbeat to namenode,
// using (capacity == 0) as proxy.
for (DatanodeInfo dn : dnInfo) {
@@ -1179,6 +1443,13 @@ public class MiniDFSCluster {
return true;
}
}
+
+ // If datanode dataset is not initialized then wait
+ for (DataNodeProperties dn : dataNodes) {
+ if (dn.datanode.data == null) {
+ return true;
+ }
+ }
return false;
}
@@ -1195,11 +1466,12 @@ public class MiniDFSCluster {
* @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
* @return the block report for the specified data node
*/
- public Iterable<Block> getBlockReport(int dataNodeIndex) {
+ public Iterable<Block> getBlockReport(String bpid, int dataNodeIndex) {
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
- return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport();
+ return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport(
+ bpid);
}
@@ -1208,11 +1480,11 @@ public class MiniDFSCluster {
* @return block reports from all data nodes
* BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes()
*/
- public Iterable<Block>[] getAllBlockReports() {
+ public Iterable<Block>[] getAllBlockReports(String bpid) {
int numDataNodes = dataNodes.size();
Iterable<Block>[] result = new BlockListAsLongs[numDataNodes];
for (int i = 0; i < numDataNodes; ++i) {
- result[i] = getBlockReport(i);
+ result[i] = getBlockReport(bpid, i);
}
return result;
}
@@ -1235,11 +1507,30 @@ public class MiniDFSCluster {
if (!(dataSet instanceof SimulatedFSDataset)) {
throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
}
+ String bpid = getNamesystem().getBlockPoolId();
SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
- sdataset.injectBlocks(blocksToInject);
- dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0);
+ sdataset.injectBlocks(bpid, blocksToInject);
+ dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
}
-
+
+ /**
+ * Multiple-NameNode version of {@link #injectBlocks(Iterable[])}.
+ */
+ public void injectBlocks(int nameNodeIndex, int dataNodeIndex,
+ Iterable<Block> blocksToInject) throws IOException {
+ if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
+ throw new IndexOutOfBoundsException();
+ }
+ FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset();
+ if (!(dataSet instanceof SimulatedFSDataset)) {
+ throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
+ }
+ String bpid = getNamesystem(nameNodeIndex).getBlockPoolId();
+ SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
+ sdataset.injectBlocks(bpid, blocksToInject);
+ dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
+ }
+
/**
* This method is valid only if the data nodes have simulated data
* @param blocksToInject - blocksToInject[] is indexed in the same order as the list
@@ -1249,7 +1540,8 @@ public class MiniDFSCluster {
* if any of blocks already exist in the data nodes
* Note the rest of the blocks are not injected.
*/
- public void injectBlocks(Iterable<Block>[] blocksToInject) throws IOException {
+ public void injectBlocks(Iterable<Block>[] blocksToInject)
+ throws IOException {
if (blocksToInject.length > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
@@ -1288,6 +1580,163 @@ public class MiniDFSCluster {
public static String getBaseDirectory() {
return System.getProperty("test.build.data", "build/test/data") + "/dfs/";
}
+
+ /**
+ * Get a storage directory for a datanode. There are two storage directories
+ * per datanode:
+ * <ol>
+ * <li><base directory>/data/data<2*dnIndex + 1></li>
+ * <li><base directory>/data/data<2*dnIndex + 2></li>
+ * </ol>
+ *
+ * @param dnIndex datanode index (starts from 0)
+ * @param dirIndex directory index (0 or 1). Index 0 provides access to the
+ * first storage directory. Index 1 provides access to the second
+ * storage directory.
+ * @return Storage directory
+ */
+ public static File getStorageDir(int dnIndex, int dirIndex) {
+ return new File(getBaseDirectory() + "data/data" + (2*dnIndex + 1 + dirIndex));
+ }
+
+ /**
+ * Get current directory corresponding to the datanode
+ * @param storageDir
+ * @return current directory
+ */
+ public static String getDNCurrentDir(File storageDir) {
+ return storageDir + "/" + Storage.STORAGE_DIR_CURRENT + "/";
+ }
+
+ /**
+ * Get directory corresponding to block pool directory in the datanode
+ * @param storageDir
+ * @return current directory
+ */
+ public static String getBPDir(File storageDir, String bpid) {
+ return getDNCurrentDir(storageDir) + bpid + "/";
+ }
+ /**
+ * Get directory relative to block pool directory in the datanode
+ * @param storageDir
+ * @return current directory
+ */
+ public static String getBPDir(File storageDir, String bpid, String dirName) {
+ return getBPDir(storageDir, bpid) + dirName + "/";
+ }
+
+ /**
+ * Get finalized directory for a block pool
+ * @param storageDir storage directory
+ * @param bpid Block pool Id
+ * @return finalized directory for a block pool
+ */
+ public static File getRbwDir(File storageDir, String bpid) {
+ return new File(getBPDir(storageDir, bpid, Storage.STORAGE_DIR_CURRENT)
+ + DataStorage.STORAGE_DIR_RBW );
+ }
+
+ /**
+ * Get finalized directory for a block pool
+ * @param storageDir storage directory
+ * @param bpid Block pool Id
+ * @return finalized directory for a block pool
+ */
+ public static File getFinalizedDir(File storageDir, String bpid) {
+ return new File(getBPDir(storageDir, bpid, Storage.STORAGE_DIR_CURRENT)
+ + DataStorage.STORAGE_DIR_FINALIZED );
+ }
+
+ /**
+ * Get file correpsonding to a block
+ * @param storageDir storage directory
+ * @param blk block to be corrupted
+ * @return file corresponding to the block
+ */
+ public static File getBlockFile(File storageDir, ExtendedBlock blk) {
+ return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()),
+ blk.getBlockName());
+ }
+
+ /**
+ * Get all files related to a block from all the datanodes
+ * @param block block for which corresponding files are needed
+ */
+ public File[] getAllBlockFiles(ExtendedBlock block) {
+ if (dataNodes.size() == 0) return new File[0];
+ ArrayList<File> list = new ArrayList<File>();
+ for (int i=0; i < dataNodes.size(); i++) {
+ File blockFile = getBlockFile(i, block);
+ if (blockFile != null) {
+ list.add(blockFile);
+ }
+ }
+ return list.toArray(new File[list.size()]);
+ }
+
+ /**
+ * Get files related to a block for a given datanode
+ * @param dnIndex Index of the datanode to get block files for
+ * @param block block for which corresponding files are needed
+ */
+ public static File getBlockFile(int dnIndex, ExtendedBlock block) {
+ // Check for block file in the two storage directories of the datanode
+ for (int i = 0; i <=1 ; i++) {
+ File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i);
+ File blockFile = getBlockFile(storageDir, block);
+ if (blockFile.exists()) {
+ return blockFile;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Throw an exception if the MiniDFSCluster is not started with a single
+ * namenode
+ */
+ private void checkSingleNameNode() {
+ if (nameNodes.length != 1) {
+ throw new IllegalArgumentException("Namenode index is needed");
+ }
+ }
+
+ /**
+ * Add a namenode to a federated cluster and start it. Configuration of
+ * datanodes in the cluster is refreshed to register with the new namenode.
+ *
+ * @return newly started namenode
+ */
+ public NameNode addNameNode(Configuration conf, int namenodePort)
+ throws IOException {
+ if(!federation)
+ throw new IOException("cannot add namenode to non-federated cluster");
+
+ int nnIndex = nameNodes.length;
+ int numNameNodes = nameNodes.length + 1;
+ NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes];
+ System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length);
+ nameNodes = newlist;
+ String nameserviceId = NAMESERVICE_ID_PREFIX + (nnIndex + 1);
+
+ String nameserviceIds = conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES);
+ nameserviceIds += "," + nameserviceId;
+ conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nameserviceIds);
+
+ initFederatedNamenodeAddress(conf, nameserviceId, namenodePort);
+ createFederatedNameNode(nnIndex, conf, numDataNodes, true, true, null,
+ null, nameserviceId);
+
+ // Refresh datanodes with the newly started namenode
+ for (DataNodeProperties dn : dataNodes) {
+ DataNode datanode = dn.datanode;
+ datanode.refreshNamenodes(conf);
+ }
+
+ // Wait for new namenode to get registrations from all the datanodes
+ waitActive(nnIndex);
+ return nameNodes[nnIndex].nameNode;
+ }
private int getFreeSocketPort() {
int port = 0;
@@ -1322,9 +1771,7 @@ public class MiniDFSCluster {
private void addToFile(String p, String address) throws IOException {
File f = new File(p);
- if (!f.exists()) {
- f.createNewFile();
- }
+ f.createNewFile();
PrintWriter writer = new PrintWriter(new FileWriter(f, true));
try {
writer.println(address);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java Fri Apr 29 18:16:32 2011
@@ -19,21 +19,18 @@ package org.apache.hadoop.hdfs;
import java.io.File;
import java.io.IOException;
-import java.util.Properties;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.BlockMissingException;
@@ -64,7 +61,7 @@ public class TestBlockMissingException e
// extract block locations from File system. Wait till file is closed.
LocatedBlocks locations = null;
locations = fileSys.dfs.getNamenode().getBlockLocations(file1.toString(),
- 0, numBlocks * blockSize);
+ 0, numBlocks * blockSize);
// remove block of file
LOG.info("Remove first block of file");
corruptBlock(file1, locations.get(0).getBlock());
@@ -119,46 +116,15 @@ public class TestBlockMissingException e
assertTrue("Expected BlockMissingException ", gotException);
}
- /*
- * The Data directories for a datanode
- */
- private File[] getDataNodeDirs(int i) throws IOException {
- String base_dir = MiniDFSCluster.getBaseDirectory();
- File data_dir = new File(base_dir, "data");
- File dir1 = new File(data_dir, "data"+(2*i+1));
- File dir2 = new File(data_dir, "data"+(2*i+2));
- if (dir1.isDirectory() && dir2.isDirectory()) {
- File[] dir = new File[2];
- dir[0] = new File(dir1, MiniDFSCluster.FINALIZED_DIR_NAME);
- dir[1] = new File(dir2, MiniDFSCluster.FINALIZED_DIR_NAME);
- return dir;
- }
- return new File[0];
- }
-
//
// Corrupt specified block of file
//
- void corruptBlock(Path file, Block blockNum) throws IOException {
- long id = blockNum.getBlockId();
-
- // Now deliberately remove/truncate data blocks from the block.
- //
- for (int i = 0; i < NUM_DATANODES; i++) {
- File[] dirs = getDataNodeDirs(i);
-
- for (int j = 0; j < dirs.length; j++) {
- File[] blocks = dirs[j].listFiles();
- assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
- for (int idx = 0; idx < blocks.length; idx++) {
- if (blocks[idx].getName().startsWith("blk_" + id) &&
- !blocks[idx].getName().endsWith(".meta")) {
- blocks[idx].delete();
- LOG.info("Deleted block " + blocks[idx]);
- }
- }
- }
+ void corruptBlock(Path file, ExtendedBlock blk) {
+ // Now deliberately remove/truncate data blocks from the file.
+ File[] blockFiles = dfs.getAllBlockFiles(blk);
+ for (File f : blockFiles) {
+ f.delete();
+ LOG.info("Deleted block " + f);
}
}
-
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Fri Apr 29 18:16:32 2011
@@ -30,8 +30,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.never;
-import static org.junit.Assert.*;
-
public class TestClientBlockVerification {
static BlockReaderTestUtil util = null;
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Fri Apr 29 18:16:32 2011
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -51,7 +51,7 @@ public class TestClientProtocolForPipeli
Path file = new Path("dataprotocol.dat");
DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
// get the first blockid for the file
- Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+ ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
// test getNewStampAndToken on a finalized block
try {
@@ -64,8 +64,8 @@ public class TestClientProtocolForPipeli
// test getNewStampAndToken on a non-existent block
try {
long newBlockId = firstBlock.getBlockId() + 1;
- Block newBlock = new Block(newBlockId, 0,
- firstBlock.getGenerationStamp());
+ ExtendedBlock newBlock = new ExtendedBlock(firstBlock.getBlockPoolId(),
+ newBlockId, 0, firstBlock.getGenerationStamp());
namenode.updateBlockForPipeline(newBlock, "");
Assert.fail("Cannot get a new GS from a non-existent block");
} catch (IOException e) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java Fri Apr 29 18:16:32 2011
@@ -31,6 +31,7 @@ import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
/**
@@ -82,8 +83,9 @@ public class TestCrcCorruption {
// file disallows this Datanode to send data to another datanode.
// However, a client is alowed access to this block.
//
- File data_dir = new File(System.getProperty("test.build.data"),
- "dfs/data/data1" + MiniDFSCluster.FINALIZED_DIR_NAME);
+ File storageDir = MiniDFSCluster.getStorageDir(0, 1);
+ String bpid = cluster.getNamesystem().getBlockPoolId();
+ File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
assertTrue("data directory does not exist", data_dir.exists());
File[] blocks = data_dir.listFiles();
assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
@@ -135,12 +137,13 @@ public class TestCrcCorruption {
}
}
}
+
//
// Now deliberately corrupt all meta blocks from the second
// directory of the first datanode
//
- data_dir = new File(System.getProperty("test.build.data"),
- "dfs/data/data2" + MiniDFSCluster.FINALIZED_DIR_NAME);
+ storageDir = MiniDFSCluster.getStorageDir(0, 1);
+ data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
assertTrue("data directory does not exist", data_dir.exists());
blocks = data_dir.listFiles();
assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
@@ -256,7 +259,7 @@ public class TestCrcCorruption {
DFSTestUtil.createFile(fs, file, fileSize, replFactor, 12345L /*seed*/);
DFSTestUtil.waitReplication(fs, file, replFactor);
- String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file);
int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
assertEquals("All replicas not corrupted", replFactor, blockFilesCorrupted);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Apr 29 18:16:32 2011
@@ -43,9 +43,9 @@ import org.apache.hadoop.fs.FileChecksum
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -157,7 +157,7 @@ public class TestDFSClientRetries extend
};
when(mockNN.addBlock(anyString(),
anyString(),
- any(Block.class),
+ any(ExtendedBlock.class),
any(DatanodeInfo[].class))).thenAnswer(answer);
final DFSClient client = new DFSClient(null, mockNN, conf, null);
@@ -489,7 +489,7 @@ public class TestDFSClientRetries extend
public void run() {
try {
- fs = cluster.getNewFileSystemInstance();
+ fs = cluster.getNewFileSystemInstance(0);
int bufferSize = len;
byte[] buf = new byte[bufferSize];
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java Fri Apr 29 18:16:32 2011
@@ -69,7 +69,7 @@ public class TestDFSFinalize extends Tes
assertEquals(
UpgradeUtilities.checksumContents(
DATA_NODE, new File(dataNodeDirs[i],"current")),
- UpgradeUtilities.checksumMasterContents(DATA_NODE));
+ UpgradeUtilities.checksumMasterDataNodeContents());
}
for (int i = 0; i < nameNodeDirs.length; i++) {
assertFalse(new File(nameNodeDirs[i],"previous").isDirectory());
@@ -95,10 +95,10 @@ public class TestDFSFinalize extends Tes
String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
log("Finalize with existing previous dir", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
- UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
- UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+ UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+ UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
cluster = new MiniDFSCluster.Builder(conf)
.format(false)
.manageDataDfsDirs(false)
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java Fri Apr 29 18:16:32 2011
@@ -76,7 +76,7 @@ public class TestDFSRollback extends Tes
assertEquals(
UpgradeUtilities.checksumContents(
nodeType, new File(baseDirs[i],"current")),
- UpgradeUtilities.checksumMasterContents(nodeType));
+ UpgradeUtilities.checksumMasterDataNodeContents());
}
break;
}
@@ -104,17 +104,17 @@ public class TestDFSRollback extends Tes
}
/**
- * Attempts to start a DataNode with the given operation. Starting
- * the DataNode should throw an exception.
+ * Attempts to start a DataNode with the given operation. Starting
+ * the given block pool should fail.
+ * @param operation startup option
+ * @param bpid block pool Id that should fail to start
+ * @throws IOException
*/
- void startDataNodeShouldFail(StartupOption operation) {
- try {
- cluster.startDataNodes(conf, 1, false, operation, null); // should fail
- throw new AssertionError("DataNode should have failed to start");
- } catch (Exception expected) {
- // expected
- assertFalse(cluster.isDataNodeUp());
- }
+ void startBlockPoolShouldFail(StartupOption operation, String bpid)
+ throws IOException {
+ cluster.startDataNodes(conf, 1, false, operation, null); // should fail
+ assertFalse("Block pool " + bpid + " should have failed to start",
+ cluster.getDataNodes().get(0).isBPServiceAlive(bpid));
}
/**
@@ -125,6 +125,7 @@ public class TestDFSRollback extends Tes
File[] baseDirs;
UpgradeUtilities.initialize();
+ StorageInfo storageInfo = null;
for (int numDirs = 1; numDirs <= 2; numDirs++) {
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
@@ -133,8 +134,8 @@ public class TestDFSRollback extends Tes
String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
log("Normal NameNode rollback", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
@@ -146,16 +147,16 @@ public class TestDFSRollback extends Tes
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("Normal DataNode rollback", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.ROLLBACK)
.build();
- UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
- UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
+ UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+ UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
cluster.startDataNodes(conf, 1, false, StartupOption.ROLLBACK, null);
checkResult(DATA_NODE, dataNodeDirs);
cluster.shutdown();
@@ -163,67 +164,77 @@ public class TestDFSRollback extends Tes
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
log("NameNode rollback without existing previous dir", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
startNameNodeShouldFail(StartupOption.ROLLBACK);
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("DataNode rollback without existing previous dir", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.UPGRADE)
.build();
- UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
+ UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
cluster.startDataNodes(conf, 1, false, StartupOption.ROLLBACK, null);
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
log("DataNode rollback with future stored layout version in previous", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.ROLLBACK)
.build();
- UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
- baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
- UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
- new StorageInfo(Integer.MIN_VALUE,
- UpgradeUtilities.getCurrentNamespaceID(cluster),
- UpgradeUtilities.getCurrentFsscTime(cluster)));
- startDataNodeShouldFail(StartupOption.ROLLBACK);
+ UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+ baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
+ storageInfo = new StorageInfo(Integer.MIN_VALUE,
+ UpgradeUtilities.getCurrentNamespaceID(cluster),
+ UpgradeUtilities.getCurrentClusterID(cluster),
+ UpgradeUtilities.getCurrentFsscTime(cluster));
+
+ UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
+ UpgradeUtilities.getCurrentBlockPoolID(cluster));
+
+ startBlockPoolShouldFail(StartupOption.ROLLBACK,
+ cluster.getNamesystem().getBlockPoolId());
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
log("DataNode rollback with newer fsscTime in previous", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.ROLLBACK)
.build();
- UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
- baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
- UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
- new StorageInfo(UpgradeUtilities.getCurrentLayoutVersion(),
- UpgradeUtilities.getCurrentNamespaceID(cluster),
- Long.MAX_VALUE));
- startDataNodeShouldFail(StartupOption.ROLLBACK);
+
+ UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+ baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
+ storageInfo = new StorageInfo(UpgradeUtilities.getCurrentLayoutVersion(),
+ UpgradeUtilities.getCurrentNamespaceID(cluster),
+ UpgradeUtilities.getCurrentClusterID(cluster), Long.MAX_VALUE);
+
+ UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
+ UpgradeUtilities.getCurrentBlockPoolID(cluster));
+
+ startBlockPoolShouldFail(StartupOption.ROLLBACK,
+ cluster.getNamesystem().getBlockPoolId());
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
log("NameNode rollback with no edits file", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
for (File f : baseDirs) {
FileUtil.fullyDelete(new File(f,"edits"));
}
@@ -231,8 +242,8 @@ public class TestDFSRollback extends Tes
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with no image file", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
for (File f : baseDirs) {
FileUtil.fullyDelete(new File(f,"fsimage"));
}
@@ -240,8 +251,8 @@ public class TestDFSRollback extends Tes
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with corrupt version file", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
for (File f : baseDirs) {
UpgradeUtilities.corruptFile(new File(f,"VERSION"));
}
@@ -249,12 +260,15 @@ public class TestDFSRollback extends Tes
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with old layout version in previous", numDirs);
- UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
- baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
- UpgradeUtilities.createVersionFile(conf, NAME_NODE, baseDirs,
- new StorageInfo(1,
- UpgradeUtilities.getCurrentNamespaceID(null),
- UpgradeUtilities.getCurrentFsscTime(null)));
+ UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+ baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+ storageInfo = new StorageInfo(1,
+ UpgradeUtilities.getCurrentNamespaceID(null),
+ UpgradeUtilities.getCurrentClusterID(null),
+ UpgradeUtilities.getCurrentFsscTime(null));
+
+ UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs,
+ storageInfo, UpgradeUtilities.getCurrentBlockPoolID(cluster));
startNameNodeShouldFail(StartupOption.UPGRADE);
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
} // end numDir loop
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java Fri Apr 29 18:16:32 2011
@@ -1108,11 +1108,12 @@ public class TestDFSShell extends TestCa
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
List<File> files = new ArrayList<File>();
List<DataNode> datanodes = cluster.getDataNodes();
- Iterable<Block>[] blocks = cluster.getAllBlockReports();
+ String poolId = cluster.getNamesystem().getBlockPoolId();
+ Iterable<Block>[] blocks = cluster.getAllBlockReports(poolId);
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
- files.add(ds.getBlockFile(b));
+ files.add(ds.getBlockFile(poolId, b));
}
}
return files;