You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2014/07/12 04:25:03 UTC

svn commit: r1609878 [2/2] - in /hadoop/common/branches/YARN-1051/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-c...

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Sat Jul 12 02:24:40 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -359,6 +360,15 @@ public abstract class FileInputFormat<K,
                                 String[] hosts) {
     return new FileSplit(file, start, length, hosts);
   }
+  
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts, String[] inMemoryHosts) {
+    return new FileSplit(file, start, length, hosts, inMemoryHosts);
+  }
 
   /** 
    * Generate the list of files and make them into FileSplits.
@@ -392,17 +402,20 @@ public abstract class FileInputFormat<K,
           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
-                                     blkLocations[blkIndex].getHosts()));
+                        blkLocations[blkIndex].getHosts(),
+                        blkLocations[blkIndex].getCachedHosts()));
             bytesRemaining -= splitSize;
           }
 
           if (bytesRemaining != 0) {
             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
-                       blkLocations[blkIndex].getHosts()));
+                       blkLocations[blkIndex].getHosts(),
+                       blkLocations[blkIndex].getCachedHosts()));
           }
         } else { // not splitable
-          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
+          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
+                      blkLocations[0].getCachedHosts()));
         }
       } else { 
         //Create empty hosts array for zero length files

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Sat Jul 12 02:24:40 2014
@@ -22,11 +22,13 @@ import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
 
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -41,6 +43,7 @@ public class FileSplit extends InputSpli
   private long start;
   private long length;
   private String[] hosts;
+  private SplitLocationInfo[] hostInfos;
 
   public FileSplit() {}
 
@@ -57,6 +60,31 @@ public class FileSplit extends InputSpli
     this.length = length;
     this.hosts = hosts;
   }
+  
+  /** Constructs a split with host and cached-blocks information
+  *
+  * @param file the file name
+  * @param start the position of the first byte in the file to process
+  * @param length the number of bytes in the file to process
+  * @param hosts the list of hosts containing the block
+  * @param inMemoryHosts the list of hosts containing the block in memory
+  */
+ public FileSplit(Path file, long start, long length, String[] hosts,
+     String[] inMemoryHosts) {
+   this(file, start, length, hosts);
+   hostInfos = new SplitLocationInfo[hosts.length];
+   for (int i = 0; i < hosts.length; i++) {
+     // because N will be tiny, scanning is probably faster than a HashSet
+     boolean inMemory = false;
+     for (String inMemoryHost : inMemoryHosts) {
+       if (inMemoryHost.equals(hosts[i])) {
+         inMemory = true;
+         break;
+       }
+     }
+     hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
+   }
+ }
  
   /** The file containing this split's data. */
   public Path getPath() { return file; }
@@ -98,4 +126,10 @@ public class FileSplit extends InputSpli
       return this.hosts;
     }
   }
+  
+  @Override
+  @Evolving
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return hostInfos;
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java Sat Jul 12 02:24:40 2014
@@ -38,6 +38,21 @@ public class HostUtil {
         httpPort + "/tasklog?attemptid=" + taskAttemptID);
   }
 
+  /**
+   * Always throws {@link RuntimeException} because this method is not
+   * supposed to be called at runtime. This method is only for keeping
+   * binary compatibility with Hive 0.13. MAPREDUCE-5830 for the details.
+   * @deprecated Use {@link #getTaskLogUrl(String, String, String, String)}
+   * to construct the taskLogUrl.
+   */
+  @Deprecated
+  public static String getTaskLogUrl(String taskTrackerHostName,
+                                     String httpPort, String taskAttemptID) {
+    throw new RuntimeException(
+        "This method is not supposed to be called at runtime. " +
+        "Use HostUtil.getTaskLogUrl(String, String, String, String) instead.");
+  }
+
   public static String convertTrackerNameToHostName(String trackerName) {
     // Ugly!
     // Convert the trackerName to its host name

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Sat Jul 12 02:24:40 2014
@@ -83,6 +83,16 @@
 </property>
 
 <property>
+  <name>mapreduce.job.reducer.preempt.delay.sec</name>
+  <value>0</value>
+  <description>The threshold in terms of seconds after which an unsatisfied mapper 
+  request triggers reducer preemption to free space. Default 0 implies that the 
+  reduces should be preempted immediately after allocation if there is currently no
+  room for newly allocated mappers.
+  </description>
+</property>
+
+<property>
     <name>mapreduce.job.max.split.locations</name>
     <value>10</value>
     <description>The max number of block locations to store for each split for 

Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1603348-1605891

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Sat Jul 12 02:24:40 2014
@@ -103,6 +103,29 @@ public class TestFileInputFormat {
   }
   
   @Test
+  public void testSplitLocationInfo() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        "test:///a1/a2");
+    JobConf job = new JobConf(conf);
+    TextInputFormat fileInputFormat = new TextInputFormat();
+    fileInputFormat.configure(job);
+    FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1);
+    String[] locations = splits[0].getLocations();
+    Assert.assertEquals(2, locations.length);
+    SplitLocationInfo[] locationInfo = splits[0].getLocationInfo();
+    Assert.assertEquals(2, locationInfo.length);
+    SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
+        locationInfo[0] : locationInfo[1];
+    SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
+        locationInfo[0] : locationInfo[1];
+    Assert.assertTrue(localhostInfo.isOnDisk());
+    Assert.assertTrue(localhostInfo.isInMemory());
+    Assert.assertTrue(otherhostInfo.isOnDisk());
+    Assert.assertFalse(otherhostInfo.isInMemory());
+  }
+  
+  @Test
   public void testListStatusSimple() throws IOException {
     Configuration conf = new Configuration();
     conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
@@ -223,8 +246,9 @@ public class TestFileInputFormat {
     public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
         throws IOException {
       return new BlockLocation[] {
-          new BlockLocation(new String[] { "localhost:50010" },
-              new String[] { "localhost" }, 0, len) };
+          new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
+              new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
+              new String[0], 0, len, false) };
     }
 
     @Override

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Sat Jul 12 02:24:40 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.After;
@@ -139,6 +140,28 @@ public class TestFileInputFormat {
         1, mockFs.numListLocatedStatusCalls);
     FileSystem.closeAll();
   }
+  
+  @Test
+  public void testSplitLocationInfo() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        "test:///a1/a2");
+    Job job = Job.getInstance(conf);
+    TextInputFormat fileInputFormat = new TextInputFormat();
+    List<InputSplit> splits = fileInputFormat.getSplits(job);
+    String[] locations = splits.get(0).getLocations();
+    Assert.assertEquals(2, locations.length);
+    SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo();
+    Assert.assertEquals(2, locationInfo.length);
+    SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
+        locationInfo[0] : locationInfo[1];
+    SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
+        locationInfo[0] : locationInfo[1];
+    Assert.assertTrue(localhostInfo.isOnDisk());
+    Assert.assertTrue(localhostInfo.isInMemory());
+    Assert.assertTrue(otherhostInfo.isOnDisk());
+    Assert.assertFalse(otherhostInfo.isInMemory());
+  }
 
   @Test
   public void testListStatusSimple() throws IOException {
@@ -402,9 +425,9 @@ public class TestFileInputFormat {
     public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
         throws IOException {
       return new BlockLocation[] {
-          new BlockLocation(new String[] { "localhost:50010" },
-              new String[] { "localhost" }, 0, len) };
-    }
+          new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
+              new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
+              new String[0], 0, len, false) };    }
 
     @Override
     protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java Sat Jul 12 02:24:40 2014
@@ -372,10 +372,13 @@ public class TestFixedLengthInputFormat 
         format.getRecordReader(split, job, voidReporter);
     LongWritable key = reader.createKey();
     BytesWritable value = reader.createValue();
-    while (reader.next(key, value)) {
-      result.add(new String(value.getBytes(), 0, value.getLength()));
+    try {
+      while (reader.next(key, value)) {
+        result.add(new String(value.getBytes(), 0, value.getLength()));
+      }
+    } finally {
+      reader.close();
     }
-    reader.close();
     return result;
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java Sat Jul 12 02:24:40 2014
@@ -183,6 +183,8 @@ public class TestPipeApplication {
       output.setWriter(wr);
       conf.set(Submitter.PRESERVE_COMMANDFILE, "true");
 
+      initStdOut(conf);
+
       Application<WritableComparable<IntWritable>, Writable, IntWritable, Text> application = new Application<WritableComparable<IntWritable>, Writable, IntWritable, Text>(
               conf, rReader, output, reporter, IntWritable.class, Text.class);
       application.getDownlink().flush();

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java Sat Jul 12 02:24:40 2014
@@ -417,15 +417,18 @@ public class TestFixedLengthInputFormat 
         new MapContextImpl<LongWritable, BytesWritable, LongWritable,
         BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
         reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
-    reader.initialize(split, mcontext);
     LongWritable key;
     BytesWritable value;
-    while (reader.nextKeyValue()) {
-      key = reader.getCurrentKey();
-      value = reader.getCurrentValue();
-      result.add(new String(value.getBytes(), 0, value.getLength()));
+    try {
+      reader.initialize(split, mcontext);
+      while (reader.nextKeyValue()) {
+        key = reader.getCurrentKey();
+        value = reader.getCurrentValue();
+        result.add(new String(value.getBytes(), 0, value.getLength()));
+      }
+    } finally {
+      reader.close();
     }
-    reader.close();
     return result;
   }