You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/08/24 14:17:53 UTC

hadoop git commit: MAPREDUCE-6578. Add support for HDFS heterogeneous storage testing to TestDFSIO. Contributed by Wei Zhou and Sammi Chen

Repository: hadoop
Updated Branches:
  refs/heads/trunk 793447f79 -> 0ce1ab95c


MAPREDUCE-6578. Add support for HDFS heterogeneous storage testing to TestDFSIO. Contributed by Wei Zhou and Sammi Chen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0ce1ab95
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0ce1ab95
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0ce1ab95

Branch: refs/heads/trunk
Commit: 0ce1ab95cc1178f9ea763fd1f5a65a890b23b0de
Parents: 793447f
Author: Kai Zheng <ka...@intel.com>
Authored: Wed Aug 24 22:17:05 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Wed Aug 24 22:17:05 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/TestDFSIO.java    | 53 +++++++++++++++++---
 1 file changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ce1ab95/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
index e7aa66b..05d4d77 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
@@ -29,6 +29,7 @@ import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.text.DecimalFormat;
+import java.util.Collection;
 import java.util.Date;
 import java.util.Random;
 import java.util.StringTokenizer;
@@ -36,7 +37,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -102,7 +105,8 @@ public class TestDFSIO implements Tool {
                     " [-compression codecClassName]" +
                     " [-nrFiles N]" +
                     " [-size Size[B|KB|MB|GB|TB]]" +
-                    " [-resFile resultFileName] [-bufferSize Bytes]";
+                    " [-resFile resultFileName] [-bufferSize Bytes]" +
+                    " [-storagePolicy storagePolicyName]";
 
   private Configuration config;
 
@@ -305,7 +309,7 @@ public class TestDFSIO implements Tool {
         writer = null;
       }
     }
-    LOG.info("created control files for: "+nrFiles+" files");
+    LOG.info("created control files for: " + nrFiles + " files");
   }
 
   private static String getFileName(int fIdx) {
@@ -326,6 +330,7 @@ public class TestDFSIO implements Tool {
    */
   private abstract static class IOStatMapper extends IOMapperBase<Long> {
     protected CompressionCodec compressionCodec;
+    protected String blockStoragePolicy;
 
     IOStatMapper() {
     }
@@ -350,6 +355,8 @@ public class TestDFSIO implements Tool {
         compressionCodec = (CompressionCodec)
             ReflectionUtils.newInstance(codec, getConf());
       }
+
+      blockStoragePolicy = getConf().get("test.io.block.storage.policy", null);
     }
 
     @Override // IOMapperBase
@@ -389,8 +396,11 @@ public class TestDFSIO implements Tool {
     @Override // IOMapperBase
     public Closeable getIOStream(String name) throws IOException {
       // create file
-      OutputStream out =
-          fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
+      Path filePath = new Path(getDataDir(getConf()), name);
+      OutputStream out = fs.create(filePath, true, bufferSize);
+      if (blockStoragePolicy != null) {
+        fs.setStoragePolicy(filePath, blockStoragePolicy);
+      }
       if(compressionCodec != null)
         out = compressionCodec.createOutputStream(out);
       LOG.info("out = " + out.getClass().getName());
@@ -713,8 +723,9 @@ public class TestDFSIO implements Tool {
       System.err.print(StringUtils.stringifyException(e));
       res = -2;
     }
-    if(res == -1)
-      System.err.print(USAGE);
+    if (res == -1) {
+      System.err.println(USAGE);
+    }
     System.exit(res);
   }
 
@@ -727,6 +738,7 @@ public class TestDFSIO implements Tool {
     long skipSize = 0;
     String resFileName = DEFAULT_RES_FILE_NAME;
     String compressionClass = null;
+    String storagePolicy = null;
     boolean isSequential = false;
     String version = TestDFSIO.class.getSimpleName() + ".1.8";
 
@@ -771,6 +783,8 @@ public class TestDFSIO implements Tool {
         bufferSize = Integer.parseInt(args[++i]);
       } else if (args[i].equalsIgnoreCase("-resfile")) {
         resFileName = args[++i];
+      } else if (args[i].equalsIgnoreCase("-storagePolicy")) {
+        storagePolicy = args[++i];
       } else {
         System.err.println("Illegal argument: " + args[i]);
         return -1;
@@ -799,6 +813,33 @@ public class TestDFSIO implements Tool {
     config.setLong("test.io.skip.size", skipSize);
     FileSystem fs = FileSystem.get(config);
 
+    if (storagePolicy != null) {
+      boolean isValid = false;
+      Collection<BlockStoragePolicy> storagePolicies =
+          ((DistributedFileSystem) fs).getAllStoragePolicies();
+      try {
+        for (BlockStoragePolicy policy : storagePolicies) {
+          if (policy.getName().equals(storagePolicy)) {
+            isValid = true;
+            break;
+          }
+        }
+      } catch (Exception e) {
+        throw new IOException("Get block storage policies error: ", e);
+      }
+      if (!isValid) {
+        System.out.println("Invalid block storage policy: " + storagePolicy);
+        System.out.println("Current supported storage policy list: ");
+        for (BlockStoragePolicy policy : storagePolicies) {
+          System.out.println(policy.getName());
+        }
+        return -1;
+      }
+
+      config.set("test.io.block.storage.policy", storagePolicy);
+      LOG.info("storagePolicy = " + storagePolicy);
+    }
+
     if (isSequential) {
       long tStart = System.currentTimeMillis();
       sequentialTest(fs, testType, nrBytes, nrFiles);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org