You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2016/10/03 22:58:33 UTC
[20/57] [abbrv] hadoop git commit: HDFS-10824.
MiniDFSCluster#storageCapacities has no effects on real capacity. Contributed
by Xiaobing Zhou.
HDFS-10824. MiniDFSCluster#storageCapacities has no effects on real capacity. Contributed by Xiaobing Zhou.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c3b235e5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c3b235e5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c3b235e5
Branch: refs/heads/HDFS-10467
Commit: c3b235e56597d55387b4003e376faee10b473d55
Parents: e19b37e
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Sep 28 11:47:37 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Sep 28 11:47:37 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 103 ++++++++++++----
.../apache/hadoop/hdfs/TestMiniDFSCluster.java | 119 +++++++++++++++++++
2 files changed, 199 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3b235e5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 3bb3a10..cf02a8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -56,6 +56,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -547,6 +548,8 @@ public class MiniDFSCluster implements AutoCloseable {
protected final int storagesPerDatanode;
private Set<FileSystem> fileSystems = Sets.newHashSet();
+ private List<long[]> storageCap = Lists.newLinkedList();
+
/**
* A unique instance identifier for the cluster. This
* is used to disambiguate HA filesystems in the case where
@@ -1648,31 +1651,64 @@ public class MiniDFSCluster implements AutoCloseable {
}
this.numDataNodes += numDataNodes;
waitActive();
-
+
+ setDataNodeStorageCapacities(
+ curDatanodesNum,
+ numDataNodes,
+ dns,
+ storageCapacities);
+
+ /* memorize storage capacities */
+ if (storageCapacities != null) {
+ storageCap.addAll(Arrays.asList(storageCapacities));
+ }
+ }
+
+ private synchronized void setDataNodeStorageCapacities(
+ final int curDatanodesNum,
+ final int numDNs,
+ final DataNode[] dns,
+ long[][] storageCapacities) throws IOException {
if (storageCapacities != null) {
- for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
+ for (int i = curDatanodesNum; i < curDatanodesNum + numDNs; ++i) {
final int index = i - curDatanodesNum;
- try (FsDatasetSpi.FsVolumeReferences volumes =
- dns[index].getFSDataset().getFsVolumeReferences()) {
- assert storageCapacities[index].length == storagesPerDatanode;
- assert volumes.size() == storagesPerDatanode;
-
- int j = 0;
- for (FsVolumeSpi fvs : volumes) {
- FsVolumeImpl volume = (FsVolumeImpl) fvs;
- LOG.info("setCapacityForTesting " + storageCapacities[index][j]
- + " for [" + volume.getStorageType() + "]" + volume
- .getStorageID());
- volume.setCapacityForTesting(storageCapacities[index][j]);
- j++;
- }
- }
+ setDataNodeStorageCapacities(index, dns[index], storageCapacities);
}
}
}
-
-
-
+
+ private synchronized void setDataNodeStorageCapacities(
+ final int curDnIdx,
+ final DataNode curDn,
+ long[][] storageCapacities) throws IOException {
+
+ if (storageCapacities == null || storageCapacities.length == 0) {
+ return;
+ }
+
+ try {
+ waitDataNodeFullyStarted(curDn);
+ } catch (TimeoutException | InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ try (FsDatasetSpi.FsVolumeReferences volumes = curDn.getFSDataset()
+ .getFsVolumeReferences()) {
+ assert storageCapacities[curDnIdx].length == storagesPerDatanode;
+ assert volumes.size() == storagesPerDatanode;
+
+ int j = 0;
+ for (FsVolumeSpi fvs : volumes) {
+ FsVolumeImpl volume = (FsVolumeImpl) fvs;
+ LOG.info("setCapacityForTesting " + storageCapacities[curDnIdx][j]
+ + " for [" + volume.getStorageType() + "]" + volume.getStorageID());
+ volume.setCapacityForTesting(storageCapacities[curDnIdx][j]);
+ j++;
+ }
+ }
+ DataNodeTestUtils.triggerHeartbeat(curDn);
+ }
+
/**
* Modify the config and start up the DataNodes. The info port for
* DataNodes is guaranteed to use a free port.
@@ -2236,6 +2272,16 @@ public class MiniDFSCluster implements AutoCloseable {
return restartDataNode(dnprop, false);
}
+ private void waitDataNodeFullyStarted(final DataNode dn)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return dn.isDatanodeFullyStarted();
+ }
+ }, 100, 60000);
+ }
+
/**
* Restart a datanode, on the same port if requested
* @param dnprop the datanode to restart
@@ -2256,10 +2302,21 @@ public class MiniDFSCluster implements AutoCloseable {
conf.set(DFS_DATANODE_IPC_ADDRESS_KEY,
addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort);
}
- DataNode newDn = DataNode.createDataNode(args, conf, secureResources);
- dataNodes.add(new DataNodeProperties(
- newDn, newconf, args, secureResources, newDn.getIpcPort()));
+ final DataNode newDn = DataNode.createDataNode(args, conf, secureResources);
+
+ final DataNodeProperties dnp = new DataNodeProperties(
+ newDn,
+ newconf,
+ args,
+ secureResources,
+ newDn.getIpcPort());
+ dataNodes.add(dnp);
numDataNodes++;
+
+ setDataNodeStorageCapacities(
+ dataNodes.lastIndexOf(dnp),
+ newDn,
+ storageCap.toArray(new long[][]{}));
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3b235e5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
index 4d027dc..3d4cc72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
@@ -25,16 +25,25 @@ import static org.junit.Assume.assumeTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.test.PathUtils;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Preconditions;
+
/**
* Tests MiniDFS cluster setup/teardown and isolation.
* Every instance is brought up with a new data dir, to ensure that
@@ -78,6 +87,116 @@ public class TestMiniDFSCluster {
}
}
+ /**
+ * Tests storage capacity setting still effective after cluster restart.
+ */
+ @Test(timeout=100000)
+ public void testClusterSetStorageCapacity() throws Throwable {
+
+ final Configuration conf = new HdfsConfiguration();
+ final int numDatanodes = 1;
+ final int defaultBlockSize = 1024;
+ final int blocks = 100;
+ final int blocksSize = 1024;
+ final int fileLen = blocks * blocksSize;
+ final long capcacity = defaultBlockSize * 2 * fileLen;
+ final long[] capacities = new long[] {capcacity, 2 * capcacity};
+
+ final MiniDFSCluster cluster = newCluster(
+ conf,
+ numDatanodes,
+ capacities,
+ defaultBlockSize,
+ fileLen);
+ verifyStorageCapacity(cluster, capacities);
+
+ /* restart all data nodes */
+ cluster.restartDataNodes();
+ cluster.waitActive();
+ verifyStorageCapacity(cluster, capacities);
+
+ /* restart all name nodes */
+ cluster.restartNameNodes();
+ cluster.waitActive();
+ verifyStorageCapacity(cluster, capacities);
+
+ /* restart all name nodes firstly and data nodes then */
+ cluster.restartNameNodes();
+ cluster.restartDataNodes();
+ cluster.waitActive();
+ verifyStorageCapacity(cluster, capacities);
+
+ /* restart all data nodes firstly and name nodes then */
+ cluster.restartDataNodes();
+ cluster.restartNameNodes();
+ cluster.waitActive();
+ verifyStorageCapacity(cluster, capacities);
+ }
+
+ private void verifyStorageCapacity(
+ final MiniDFSCluster cluster,
+ final long[] capacities) throws IOException {
+
+ FsVolumeImpl source = null;
+ FsVolumeImpl dest = null;
+
+ /* verify capacity */
+ for (int i = 0; i < cluster.getDataNodes().size(); i++) {
+ final DataNode dnNode = cluster.getDataNodes().get(i);
+ try (FsDatasetSpi.FsVolumeReferences refs = dnNode.getFSDataset()
+ .getFsVolumeReferences()) {
+ source = (FsVolumeImpl) refs.get(0);
+ dest = (FsVolumeImpl) refs.get(1);
+ assertEquals(capacities[0], source.getCapacity());
+ assertEquals(capacities[1], dest.getCapacity());
+ }
+ }
+ }
+
+ private MiniDFSCluster newCluster(
+ final Configuration conf,
+ final int numDatanodes,
+ final long[] storageCapacities,
+ final int defaultBlockSize,
+ final int fileLen)
+ throws IOException, InterruptedException, TimeoutException {
+
+ conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+
+ final String fileName = "/" + UUID.randomUUID().toString();
+ final Path filePath = new Path(fileName);
+
+ Preconditions.checkNotNull(storageCapacities);
+ Preconditions.checkArgument(
+ storageCapacities.length == 2,
+ "need to specify capacities for two storages.");
+
+ /* Write a file and restart the cluster */
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numDatanodes)
+ .storageCapacities(storageCapacities)
+ .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
+ .storagesPerDatanode(2)
+ .build();
+ cluster.waitActive();
+
+ final short replicationFactor = (short) 1;
+ final Random r = new Random();
+ FileSystem fs = cluster.getFileSystem(0);
+ DFSTestUtil.createFile(
+ fs,
+ filePath,
+ fileLen,
+ replicationFactor,
+ r.nextLong());
+ DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
+
+ return cluster;
+ }
+
@Test(timeout=100000)
public void testIsClusterUpAfterShutdown() throws Throwable {
Configuration conf = new HdfsConfiguration();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org