You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by oz...@apache.org on 2015/01/26 05:03:30 UTC

hadoop git commit: HADOOP-11450. Cleanup DistCpV1 not to use deprecated methods and fix javadocs. Contributed by Varun Saxena.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0d6bd6210 -> 7b82c4ab4


HADOOP-11450. Cleanup DistCpV1 not to use deprecated methods and fix javadocs. Contributed by Varun Saxena.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7b82c4ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7b82c4ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7b82c4ab

Branch: refs/heads/trunk
Commit: 7b82c4ab4e84256bcdee6256564f394dcc4e81ab
Parents: 0d6bd62
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Mon Jan 26 12:58:38 2015 +0900
Committer: Tsuyoshi Ozawa <oz...@apache.org>
Committed: Mon Jan 26 12:58:38 2015 +0900

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../java/org/apache/hadoop/tools/DistCpV1.java  | 152 ++++++++-----------
 2 files changed, 68 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b82c4ab/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 8618e38..662f580 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -499,6 +499,9 @@ Release 2.7.0 - UNRELEASED
 
     HADOOP-11419 Improve hadoop-maven-plugins. (Herve Boutemy via stevel)
 
+    HADOOP-11450. Cleanup DistCpV1 not to use deprecated methods and fix
+    javadocs. (Varun Saxena via ozawa)
+
   OPTIMIZATIONS
 
     HADOOP-11323. WritableComparator#compare keeps reference to byte array.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b82c4ab/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java b/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java
index c44b67b..f46c421 100644
--- a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java
+++ b/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java
@@ -51,9 +51,11 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -73,6 +75,7 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 
 /**
  * A Map-reduce program to recursively copy directories between
@@ -283,9 +286,8 @@ public class DistCpV1 implements Tool {
       long last = 0L;
       long acc = 0L;
       long cbrem = srcst.getLen();
-      SequenceFile.Reader sl = null;
-      try {
-        sl = new SequenceFile.Reader(fs, src, job);
+      try (SequenceFile.Reader sl =
+          new SequenceFile.Reader(job, Reader.file(src))) {
         for (; sl.next(key, value); last = sl.getPosition()) {
           // if adding this split would put this split past the target size,
           // cut the last split and put this next file in the next split.
@@ -299,9 +301,6 @@ public class DistCpV1 implements Tool {
           acc += key.get();
         }
       }
-      finally {
-        checkAndClose(sl);
-      }
       if (cbrem != 0) {
         splits.add(new FileSplit(src, pos, cbrem, (String[])null));
       }
@@ -438,32 +437,28 @@ public class DistCpV1 implements Tool {
      */
     private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst,
                             Reporter reporter) throws IOException {
-      FSDataInputStream in = null;
-      FSDataOutputStream out = null;
       long bytesCopied = 0L;
-      try {
-        Path srcPath = srcstat.getPath();
-        // open src file
-        in = srcPath.getFileSystem(job).open(srcPath);
+      Path srcPath = srcstat.getPath();
+      // open src file
+      try (FSDataInputStream in = srcPath.getFileSystem(job).open(srcPath)) {
         reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
         // open tmp file
-        out = create(tmpfile, reporter, srcstat);
-        LOG.info("Copying file " + srcPath + " of size " +
-                 srcstat.getLen() + " bytes...");
+        try (FSDataOutputStream out = create(tmpfile, reporter, srcstat)) {
+          LOG.info("Copying file " + srcPath + " of size " +
+                   srcstat.getLen() + " bytes...");
         
-        // copy file
-        for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) {
-          out.write(buffer, 0, bytesRead);
-          bytesCopied += bytesRead;
-          reporter.setStatus(
-              String.format("%.2f ", bytesCopied*100.0/srcstat.getLen())
-              + absdst + " [ " +
-              StringUtils.humanReadableInt(bytesCopied) + " / " +
-              StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
+          // copy file
+          for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) {
+            out.write(buffer, 0, bytesRead);
+            bytesCopied += bytesRead;
+            reporter.setStatus(
+                String.format("%.2f ", bytesCopied*100.0/srcstat.getLen())
+                + absdst + " [ " +
+                TraditionalBinaryPrefix.long2String(bytesCopied, "", 1) + " / "
+                + TraditionalBinaryPrefix.long2String(srcstat.getLen(), "", 1)
+                + " ]");
+          }
         }
-      } finally {
-        checkAndClose(in);
-        checkAndClose(out);
       }
       return bytesCopied;
     }
@@ -471,7 +466,8 @@ public class DistCpV1 implements Tool {
     /**
      * Copy a file to a destination.
      * @param srcstat src path and metadata
-     * @param dstpath dst path
+     * @param relativedst relative dst path
+     * @param outc Log of skipped files
      * @param reporter
      * @throws IOException if copy fails(even if the validation of copy fails)
      */
@@ -570,7 +566,8 @@ public class DistCpV1 implements Tool {
     }
 
     static String bytesString(long b) {
-      return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
+      return b + " bytes (" +
+          TraditionalBinaryPrefix.long2String(b, "", 1) + ")";
     }
 
     /**
@@ -762,6 +759,7 @@ public class DistCpV1 implements Tool {
 
   /**
    * Driver to copy srcPath to destPath depending on required protocol.
+   * @param conf configuration
    * @param args arguments
    */
   static void copy(final Configuration conf, final Arguments args
@@ -838,10 +836,8 @@ public class DistCpV1 implements Tool {
 
     FileSystem dstfs = destPath.getFileSystem(conf);
     Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
-    SequenceFile.Reader in = null;
-    try {
-      in = new SequenceFile.Reader(dstdirlist.getFileSystem(jobconf),
-          dstdirlist, jobconf);
+    try (SequenceFile.Reader in =
+        new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
       Text dsttext = new Text();
       FilePair pair = new FilePair(); 
       for(; in.next(dsttext, pair); ) {
@@ -849,8 +845,6 @@ public class DistCpV1 implements Tool {
         updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
             preseved, dstfs);
       }
-    } finally {
-      checkAndClose(in);
     }
   }
 
@@ -876,6 +870,8 @@ public class DistCpV1 implements Tool {
      * @param preservedAttributes Preserved attributes 
      * @param filelimit File limit
      * @param sizelimit Size limit
+     * @param mapredSslConf ssl configuration
+     * @param dryrun
      */
     Arguments(List<Path> srcs, Path basedir, Path dst, Path log,
         EnumSet<Options> flags, String preservedAttributes,
@@ -1266,15 +1262,18 @@ public class DistCpV1 implements Tool {
     long fileCount = 0L, dirCount = 0L, byteCount = 0L, cbsyncs = 0L,
          skipFileCount = 0L, skipByteCount = 0L;
     try (
-        SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs,
-            jobConf, srcfilelist, LongWritable.class, FilePair.class,
-            SequenceFile.CompressionType.NONE);
-        SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs,
-            jobConf, dstfilelist, Text.class, Text.class,
-            SequenceFile.CompressionType.NONE);
-        SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs,
-            jobConf, dstdirlist, Text.class, FilePair.class,
-            SequenceFile.CompressionType.NONE)
+        SequenceFile.Writer src_writer = SequenceFile.createWriter(jobConf,
+            Writer.file(srcfilelist), Writer.keyClass(LongWritable.class),
+            Writer.valueClass(FilePair.class), Writer.compression(
+            SequenceFile.CompressionType.NONE));
+        SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobConf,
+            Writer.file(dstfilelist), Writer.keyClass(Text.class),
+            Writer.valueClass(Text.class), Writer.compression(
+            SequenceFile.CompressionType.NONE));
+        SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobConf,
+            Writer.file(dstdirlist), Writer.keyClass(Text.class),
+            Writer.valueClass(FilePair.class), Writer.compression(
+            SequenceFile.CompressionType.NONE));
     ) {
       // handle the case where the destination directory doesn't exist
       // and we've only a single src directory OR we're updating/overwriting
@@ -1286,7 +1285,8 @@ public class DistCpV1 implements Tool {
       HashSet<Path> parentDirsToCopy = new HashSet<Path>();
       if (args.basedir != null) {
         FileSystem basefs = args.basedir.getFileSystem(conf);
-        basedir = args.basedir.makeQualified(basefs);
+        basedir = args.basedir.makeQualified(
+            basefs.getUri(), basefs.getWorkingDirectory());
         if (!basefs.isDirectory(basedir)) {
           throw new IOException("Basedir " + basedir + " is not a directory.");
         }
@@ -1307,7 +1307,8 @@ public class DistCpV1 implements Tool {
 
         if (basedir != null) {
           root = basedir;
-          Path parent = src.getParent().makeQualified(srcfs);
+          Path parent = src.getParent().makeQualified(
+              srcfs.getUri(), srcfs.getWorkingDirectory());
           while (parent != null && !parent.equals(basedir)) {
             if (!parentDirsToCopy.contains(parent)){
               parentDirsToCopy.add(parent);
@@ -1427,11 +1428,12 @@ public class DistCpV1 implements Tool {
     }
     LOG.info("sourcePathsCount(files+directories)=" + srcCount);
     LOG.info("filesToCopyCount=" + fileCount);
-    LOG.info("bytesToCopyCount=" + StringUtils.humanReadableInt(byteCount));
+    LOG.info("bytesToCopyCount=" +
+             TraditionalBinaryPrefix.long2String(byteCount, "", 1));
     if (update) {
       LOG.info("filesToSkipCopyCount=" + skipFileCount);
       LOG.info("bytesToSkipCopyCount=" +
-               StringUtils.humanReadableInt(skipByteCount));
+               TraditionalBinaryPrefix.long2String(skipByteCount, "", 1));
     }
     if (args.dryrun) {
       return false;
@@ -1475,7 +1477,8 @@ public class DistCpV1 implements Tool {
 
     LOG.info("sourcePathsCount=" + srcCount);
     LOG.info("filesToCopyCount=" + fileCount);
-    LOG.info("bytesToCopyCount=" + StringUtils.humanReadableInt(byteCount));
+    LOG.info("bytesToCopyCount=" +
+             TraditionalBinaryPrefix.long2String(byteCount, "", 1));
     jobConf.setInt(SRC_COUNT_LABEL, srcCount);
     jobConf.setLong(TOTAL_SIZE_LABEL, byteCount);
     
@@ -1559,10 +1562,10 @@ public class DistCpV1 implements Tool {
 
     //write dst lsr results
     final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
-    final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf,
-        dstlsr, Text.class, NullWritable.class,
-        SequenceFile.CompressionType.NONE);
-    try {
+    try (final SequenceFile.Writer writer = SequenceFile.createWriter(jobconf,
+        Writer.file(dstlsr), Writer.keyClass(Text.class),
+        Writer.valueClass(NullWritable.class), Writer.compression(
+        SequenceFile.CompressionType.NONE))) {
       //do lsr to get all file statuses in dstroot
       final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
       for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
@@ -1575,8 +1578,6 @@ public class DistCpV1 implements Tool {
           }
         }
       }
-    } finally {
-      checkAndClose(writer);
     }
 
     //sort lsr results
@@ -1586,13 +1587,11 @@ public class DistCpV1 implements Tool {
     sorter.sort(dstlsr, sortedlsr);
 
     //compare lsr list and dst list  
-    SequenceFile.Reader lsrin = null;
-    SequenceFile.Reader dstin = null;
     long deletedPathsCount = 0;
-    try {
-      lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf);
-      dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf);
-
+    try (SequenceFile.Reader lsrin =
+             new SequenceFile.Reader(jobconf, Reader.file(sortedlsr));
+         SequenceFile.Reader  dstin =
+             new SequenceFile.Reader(jobconf, Reader.file(dstsorted))) {
       //compare sorted lsr list and sorted dst list
       final Text lsrpath = new Text();
       final Text dstpath = new Text();
@@ -1623,9 +1622,6 @@ public class DistCpV1 implements Tool {
           }
         }
       }
-    } finally {
-      checkAndClose(lsrin);
-      checkAndClose(dstin);
     }
     return deletedPathsCount;
   }
@@ -1644,13 +1640,11 @@ public class DistCpV1 implements Tool {
   /** Check whether the file list have duplication. */
   static private void checkDuplication(FileSystem fs, Path file, Path sorted,
     Configuration conf) throws IOException {
-    SequenceFile.Reader in = null;
-    try {
-      SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
-        new Text.Comparator(), Text.class, Text.class, conf);
-      sorter.sort(file, sorted);
-      in = new SequenceFile.Reader(fs, sorted, conf);
-
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+      new Text.Comparator(), Text.class, Text.class, conf);
+    sorter.sort(file, sorted);
+    try (SequenceFile.Reader in =
+         new SequenceFile.Reader(conf, Reader.file(sorted))) {
       Text prevdst = null, curdst = new Text();
       Text prevsrc = null, cursrc = new Text(); 
       for(; in.next(curdst, cursrc); ) {
@@ -1665,24 +1659,8 @@ public class DistCpV1 implements Tool {
         cursrc = new Text();
       }
     }
-    finally {
-      checkAndClose(in);
-    }
   } 
 
-  static boolean checkAndClose(java.io.Closeable io) {
-    if (io != null) {
-      try {
-        io.close();
-      }
-      catch(IOException ioe) {
-        LOG.warn(StringUtils.stringifyException(ioe));
-        return false;
-      }
-    }
-    return true;
-  }
-
   /** An exception class for duplicated source files. */
   public static class DuplicationException extends IOException {
     private static final long serialVersionUID = 1L;