You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/09/26 09:08:24 UTC
[06/50] [abbrv] hadoop git commit: MAPREDUCE-6774. Add support for
HDFS erasure code policy to TestDFSIO. Contributed by Sammi Chen
MAPREDUCE-6774. Add support for HDFS erasure code policy to TestDFSIO. Contributed by Sammi Chen
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/501a7785
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/501a7785
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/501a7785
Branch: refs/heads/MAPREDUCE-6608
Commit: 501a77856d6b6edfb261547117e719da7a9cd221
Parents: 58bae35
Author: Kai Zheng <ka...@intel.com>
Authored: Sun Sep 18 09:03:15 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Sun Sep 18 09:03:15 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/TestDFSIO.java | 159 +++++++++++++++----
1 file changed, 124 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/501a7785/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
index 05d4d77..d218169 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -106,9 +107,14 @@ public class TestDFSIO implements Tool {
" [-nrFiles N]" +
" [-size Size[B|KB|MB|GB|TB]]" +
" [-resFile resultFileName] [-bufferSize Bytes]" +
- " [-storagePolicy storagePolicyName]";
+ " [-storagePolicy storagePolicyName]" +
+ " [-erasureCodePolicy erasureCodePolicyName]";
private Configuration config;
+ private static final String STORAGE_POLICY_NAME_KEY =
+ "test.io.block.storage.policy";
+ private static final String ERASURE_CODE_POLICY_NAME_KEY =
+ "test.io.erasure.code.policy";
static{
Configuration.addDefaultResource("hdfs-default.xml");
@@ -211,9 +217,9 @@ public class TestDFSIO implements Tool {
bench = new TestDFSIO();
bench.getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
cluster = new MiniDFSCluster.Builder(bench.getConf())
- .numDataNodes(2)
- .format(true)
- .build();
+ .numDataNodes(2)
+ .format(true)
+ .build();
FileSystem fs = cluster.getFileSystem();
bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES);
@@ -356,7 +362,7 @@ public class TestDFSIO implements Tool {
ReflectionUtils.newInstance(codec, getConf());
}
- blockStoragePolicy = getConf().get("test.io.block.storage.policy", null);
+ blockStoragePolicy = getConf().get(STORAGE_POLICY_NAME_KEY, null);
}
@Override // IOMapperBase
@@ -388,9 +394,10 @@ public class TestDFSIO implements Tool {
*/
public static class WriteMapper extends IOStatMapper {
- public WriteMapper() {
- for(int i=0; i < bufferSize; i++)
- buffer[i] = (byte)('0' + i % 50);
+ public WriteMapper() {
+ for (int i = 0; i < bufferSize; i++) {
+ buffer[i] = (byte) ('0' + i % 50);
+ }
}
@Override // IOMapperBase
@@ -431,6 +438,9 @@ public class TestDFSIO implements Tool {
fs.delete(getDataDir(config), true);
fs.delete(writeDir, true);
long tStart = System.currentTimeMillis();
+ if (isECEnabled()) {
+ createAndEnableECOnPath(fs, getDataDir(config));
+ }
runIOTest(WriteMapper.class, writeDir);
long execTime = System.currentTimeMillis() - tStart;
return execTime;
@@ -734,6 +744,7 @@ public class TestDFSIO implements Tool {
TestType testType = null;
int bufferSize = DEFAULT_BUFFER_SIZE;
long nrBytes = 1*MEGA;
+ String erasureCodePolicyName = null;
int nrFiles = 1;
long skipSize = 0;
String resFileName = DEFAULT_RES_FILE_NAME;
@@ -785,26 +796,31 @@ public class TestDFSIO implements Tool {
resFileName = args[++i];
} else if (args[i].equalsIgnoreCase("-storagePolicy")) {
storagePolicy = args[++i];
+ } else if (args[i].equalsIgnoreCase("-erasureCodePolicy")) {
+ erasureCodePolicyName = args[++i];
} else {
System.err.println("Illegal argument: " + args[i]);
return -1;
}
}
- if(testType == null)
+ if (testType == null) {
return -1;
- if(testType == TestType.TEST_TYPE_READ_BACKWARD)
+ }
+ if (testType == TestType.TEST_TYPE_READ_BACKWARD) {
skipSize = -bufferSize;
- else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
+ } else if (testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0) {
skipSize = bufferSize;
+ }
LOG.info("nrFiles = " + nrFiles);
LOG.info("nrBytes (MB) = " + toMB(nrBytes));
LOG.info("bufferSize = " + bufferSize);
- if(skipSize > 0)
+ if (skipSize > 0) {
LOG.info("skipSize = " + skipSize);
+ }
LOG.info("baseDir = " + getBaseDir(config));
- if(compressionClass != null) {
+ if (compressionClass != null) {
config.set("test.io.compression.class", compressionClass);
LOG.info("compressionClass = " + compressionClass);
}
@@ -813,31 +829,16 @@ public class TestDFSIO implements Tool {
config.setLong("test.io.skip.size", skipSize);
FileSystem fs = FileSystem.get(config);
- if (storagePolicy != null) {
- boolean isValid = false;
- Collection<BlockStoragePolicy> storagePolicies =
- ((DistributedFileSystem) fs).getAllStoragePolicies();
- try {
- for (BlockStoragePolicy policy : storagePolicies) {
- if (policy.getName().equals(storagePolicy)) {
- isValid = true;
- break;
- }
- }
- } catch (Exception e) {
- throw new IOException("Get block storage policies error: ", e);
- }
- if (!isValid) {
- System.out.println("Invalid block storage policy: " + storagePolicy);
- System.out.println("Current supported storage policy list: ");
- for (BlockStoragePolicy policy : storagePolicies) {
- System.out.println(policy.getName());
- }
+ if (erasureCodePolicyName != null) {
+ if (!checkErasureCodePolicy(erasureCodePolicyName, fs, testType)) {
return -1;
}
+ }
- config.set("test.io.block.storage.policy", storagePolicy);
- LOG.info("storagePolicy = " + storagePolicy);
+ if (storagePolicy != null) {
+ if (!checkStoragePolicy(storagePolicy, fs)) {
+ return -1;
+ }
}
if (isSequential) {
@@ -908,6 +909,94 @@ public class TestDFSIO implements Tool {
return ((float)bytes)/MEGA;
}
+ private boolean checkErasureCodePolicy(String erasureCodePolicyName,
+ FileSystem fs, TestType testType) throws IOException {
+ Collection<ErasureCodingPolicy> list =
+ ((DistributedFileSystem) fs).getAllErasureCodingPolicies();
+ boolean isValid = false;
+ for (ErasureCodingPolicy ec : list) {
+ if (erasureCodePolicyName.equals(ec.getName())) {
+ isValid = true;
+ break;
+ }
+ }
+
+ if (!isValid) {
+ System.out.println("Invalid erasure code policy: " +
+ erasureCodePolicyName);
+ System.out.println("Current supported erasure code policy list: ");
+ for (ErasureCodingPolicy ec : list) {
+ System.out.println(ec.getName());
+ }
+ return false;
+ }
+
+ if (testType == TestType.TEST_TYPE_APPEND ||
+ testType == TestType.TEST_TYPE_TRUNCATE) {
+ System.out.println("So far append or truncate operation" +
+ " does not support erasureCodePolicy");
+ return false;
+ }
+
+ config.set(ERASURE_CODE_POLICY_NAME_KEY, erasureCodePolicyName);
+ LOG.info("erasureCodePolicy = " + erasureCodePolicyName);
+ return true;
+ }
+
+ private boolean checkStoragePolicy(String storagePolicy, FileSystem fs)
+ throws IOException {
+ boolean isValid = false;
+ Collection<BlockStoragePolicy> storagePolicies =
+ ((DistributedFileSystem) fs).getAllStoragePolicies();
+ try {
+ for (BlockStoragePolicy policy : storagePolicies) {
+ if (policy.getName().equals(storagePolicy)) {
+ isValid = true;
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("Get block storage policies error: ", e);
+ }
+
+ if (!isValid) {
+ System.out.println("Invalid block storage policy: " + storagePolicy);
+ System.out.println("Current supported storage policy list: ");
+ for (BlockStoragePolicy policy : storagePolicies) {
+ System.out.println(policy.getName());
+ }
+ return false;
+ }
+
+ config.set(STORAGE_POLICY_NAME_KEY, storagePolicy);
+ LOG.info("storagePolicy = " + storagePolicy);
+ return true;
+ }
+
+ private boolean isECEnabled() {
+ String erasureCodePolicyName =
+ getConf().get(ERASURE_CODE_POLICY_NAME_KEY, null);
+ return erasureCodePolicyName != null ? true : false;
+ }
+
+ void createAndEnableECOnPath(FileSystem fs, Path path)
+ throws IOException {
+ String erasureCodePolicyName =
+ getConf().get(ERASURE_CODE_POLICY_NAME_KEY, null);
+
+ fs.mkdirs(path);
+ Collection<ErasureCodingPolicy> list =
+ ((DistributedFileSystem) fs).getAllErasureCodingPolicies();
+ for (ErasureCodingPolicy ec : list) {
+ if (erasureCodePolicyName.equals(ec.getName())) {
+ ((DistributedFileSystem) fs).setErasureCodingPolicy(path, ec);
+ LOG.info("enable erasureCodePolicy = " + erasureCodePolicyName +
+ " on " + path.toString());
+ break;
+ }
+ }
+ }
+
private void analyzeResult( FileSystem fs,
TestType testType,
long execTime,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org