You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2014/07/12 04:25:03 UTC
svn commit: r1609878 [2/2] - in
/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project: ./ dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-c...
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Sat Jul 12 02:24:40 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
+import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -359,6 +360,15 @@ public abstract class FileInputFormat<K,
String[] hosts) {
return new FileSplit(file, start, length, hosts);
}
+
+ /**
+ * A factory that makes the split for this class. It can be overridden
+ * by sub-classes to make sub-types
+ */
+ protected FileSplit makeSplit(Path file, long start, long length,
+ String[] hosts, String[] inMemoryHosts) {
+ return new FileSplit(file, start, length, hosts, inMemoryHosts);
+ }
/**
* Generate the list of files and make them into FileSplits.
@@ -392,17 +402,20 @@ public abstract class FileInputFormat<K,
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
+ blkLocations[blkIndex].getHosts(),
+ blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkIndex].getHosts()));
+ blkLocations[blkIndex].getHosts(),
+ blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
- splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
+ splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
+ blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Sat Jul 12 02:24:40 2014
@@ -22,11 +22,13 @@ import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
+import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -41,6 +43,7 @@ public class FileSplit extends InputSpli
private long start;
private long length;
private String[] hosts;
+ private SplitLocationInfo[] hostInfos;
public FileSplit() {}
@@ -57,6 +60,31 @@ public class FileSplit extends InputSpli
this.length = length;
this.hosts = hosts;
}
+
+ /** Constructs a split with host and cached-blocks information
+ *
+ * @param file the file name
+ * @param start the position of the first byte in the file to process
+ * @param length the number of bytes in the file to process
+ * @param hosts the list of hosts containing the block
+ * @param inMemoryHosts the list of hosts containing the block in memory
+ */
+ public FileSplit(Path file, long start, long length, String[] hosts,
+ String[] inMemoryHosts) {
+ this(file, start, length, hosts);
+ hostInfos = new SplitLocationInfo[hosts.length];
+ for (int i = 0; i < hosts.length; i++) {
+ // because N will be tiny, scanning is probably faster than a HashSet
+ boolean inMemory = false;
+ for (String inMemoryHost : inMemoryHosts) {
+ if (inMemoryHost.equals(hosts[i])) {
+ inMemory = true;
+ break;
+ }
+ }
+ hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
+ }
+ }
/** The file containing this split's data. */
public Path getPath() { return file; }
@@ -98,4 +126,10 @@ public class FileSplit extends InputSpli
return this.hosts;
}
}
+
+ @Override
+ @Evolving
+ public SplitLocationInfo[] getLocationInfo() throws IOException {
+ return hostInfos;
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java Sat Jul 12 02:24:40 2014
@@ -38,6 +38,21 @@ public class HostUtil {
httpPort + "/tasklog?attemptid=" + taskAttemptID);
}
+ /**
+ * Always throws {@link RuntimeException} because this method is not
+ * supposed to be called at runtime. This method is only for keeping
+ * binary compatibility with Hive 0.13. MAPREDUCE-5830 for the details.
+ * @deprecated Use {@link #getTaskLogUrl(String, String, String, String)}
+ * to construct the taskLogUrl.
+ */
+ @Deprecated
+ public static String getTaskLogUrl(String taskTrackerHostName,
+ String httpPort, String taskAttemptID) {
+ throw new RuntimeException(
+ "This method is not supposed to be called at runtime. " +
+ "Use HostUtil.getTaskLogUrl(String, String, String, String) instead.");
+ }
+
public static String convertTrackerNameToHostName(String trackerName) {
// Ugly!
// Convert the trackerName to its host name
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Sat Jul 12 02:24:40 2014
@@ -83,6 +83,16 @@
</property>
<property>
+ <name>mapreduce.job.reducer.preempt.delay.sec</name>
+ <value>0</value>
+ <description>The threshold in terms of seconds after which an unsatisfied mapper
+ request triggers reducer preemption to free space. Default 0 implies that the
+ reduces should be preempted immediately after allocation if there is currently no
+ room for newly allocated mappers.
+ </description>
+</property>
+
+<property>
<name>mapreduce.job.max.split.locations</name>
<value>10</value>
<description>The max number of block locations to store for each split for
Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1603348-1605891
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Sat Jul 12 02:24:40 2014
@@ -103,6 +103,29 @@ public class TestFileInputFormat {
}
@Test
+ public void testSplitLocationInfo() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+ "test:///a1/a2");
+ JobConf job = new JobConf(conf);
+ TextInputFormat fileInputFormat = new TextInputFormat();
+ fileInputFormat.configure(job);
+ FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1);
+ String[] locations = splits[0].getLocations();
+ Assert.assertEquals(2, locations.length);
+ SplitLocationInfo[] locationInfo = splits[0].getLocationInfo();
+ Assert.assertEquals(2, locationInfo.length);
+ SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
+ locationInfo[0] : locationInfo[1];
+ SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
+ locationInfo[0] : locationInfo[1];
+ Assert.assertTrue(localhostInfo.isOnDisk());
+ Assert.assertTrue(localhostInfo.isInMemory());
+ Assert.assertTrue(otherhostInfo.isOnDisk());
+ Assert.assertFalse(otherhostInfo.isInMemory());
+ }
+
+ @Test
public void testListStatusSimple() throws IOException {
Configuration conf = new Configuration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
@@ -223,8 +246,9 @@ public class TestFileInputFormat {
public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
throws IOException {
return new BlockLocation[] {
- new BlockLocation(new String[] { "localhost:50010" },
- new String[] { "localhost" }, 0, len) };
+ new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
+ new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
+ new String[0], 0, len, false) };
}
@Override
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Sat Jul 12 02:24:40 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.junit.After;
@@ -139,6 +140,28 @@ public class TestFileInputFormat {
1, mockFs.numListLocatedStatusCalls);
FileSystem.closeAll();
}
+
+ @Test
+ public void testSplitLocationInfo() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+ "test:///a1/a2");
+ Job job = Job.getInstance(conf);
+ TextInputFormat fileInputFormat = new TextInputFormat();
+ List<InputSplit> splits = fileInputFormat.getSplits(job);
+ String[] locations = splits.get(0).getLocations();
+ Assert.assertEquals(2, locations.length);
+ SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo();
+ Assert.assertEquals(2, locationInfo.length);
+ SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
+ locationInfo[0] : locationInfo[1];
+ SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
+ locationInfo[0] : locationInfo[1];
+ Assert.assertTrue(localhostInfo.isOnDisk());
+ Assert.assertTrue(localhostInfo.isInMemory());
+ Assert.assertTrue(otherhostInfo.isOnDisk());
+ Assert.assertFalse(otherhostInfo.isInMemory());
+ }
@Test
public void testListStatusSimple() throws IOException {
@@ -402,9 +425,9 @@ public class TestFileInputFormat {
public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
throws IOException {
return new BlockLocation[] {
- new BlockLocation(new String[] { "localhost:50010" },
- new String[] { "localhost" }, 0, len) };
- }
+ new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
+ new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
+ new String[0], 0, len, false) }; }
@Override
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java Sat Jul 12 02:24:40 2014
@@ -372,10 +372,13 @@ public class TestFixedLengthInputFormat
format.getRecordReader(split, job, voidReporter);
LongWritable key = reader.createKey();
BytesWritable value = reader.createValue();
- while (reader.next(key, value)) {
- result.add(new String(value.getBytes(), 0, value.getLength()));
+ try {
+ while (reader.next(key, value)) {
+ result.add(new String(value.getBytes(), 0, value.getLength()));
+ }
+ } finally {
+ reader.close();
}
- reader.close();
return result;
}
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java Sat Jul 12 02:24:40 2014
@@ -183,6 +183,8 @@ public class TestPipeApplication {
output.setWriter(wr);
conf.set(Submitter.PRESERVE_COMMANDFILE, "true");
+ initStdOut(conf);
+
Application<WritableComparable<IntWritable>, Writable, IntWritable, Text> application = new Application<WritableComparable<IntWritable>, Writable, IntWritable, Text>(
conf, rReader, output, reporter, IntWritable.class, Text.class);
application.getDownlink().flush();
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java Sat Jul 12 02:24:40 2014
@@ -417,15 +417,18 @@ public class TestFixedLengthInputFormat
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
- reader.initialize(split, mcontext);
LongWritable key;
BytesWritable value;
- while (reader.nextKeyValue()) {
- key = reader.getCurrentKey();
- value = reader.getCurrentValue();
- result.add(new String(value.getBytes(), 0, value.getLength()));
+ try {
+ reader.initialize(split, mcontext);
+ while (reader.nextKeyValue()) {
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ result.add(new String(value.getBytes(), 0, value.getLength()));
+ }
+ } finally {
+ reader.close();
}
- reader.close();
return result;
}