You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/12/15 21:55:59 UTC
svn commit: r890983 - in /hadoop/mapreduce/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/docs/src/documentation/content/xdocs/
src/java/org/apache/hadoop/mapred/ sr...
Author: tomwhite
Date: Tue Dec 15 20:55:58 2009
New Revision: 890983
URL: http://svn.apache.org/viewvc?rev=890983&view=rev
Log:
MAPREDUCE-967. TaskTracker does not need to fully unjar job jars. Contributed by Todd Lipcon.
Added:
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Dec 15 20:55:58 2009
@@ -74,6 +74,9 @@
MAPREDUCE-1209. Move common specific part of the test TestReflectionUtils
out of mapred into common. (Todd Lipcon via tomwhite)
+ MAPREDUCE-967. TaskTracker does not need to fully unjar job jars.
+ (Todd Lipcon via tomwhite)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Dec 15 20:55:58 2009
@@ -29,6 +29,7 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.regex.Pattern;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -272,9 +273,16 @@
values = cmdLine.getOptionValues("file");
if (values != null && values.length > 0) {
+ StringBuilder unpackRegex = new StringBuilder(
+ config_.getPattern(JobContext.JAR_UNPACK_PATTERN,
+ JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern());
for (String file : values) {
packageFiles_.add(file);
+ String fname = new File(file).getName();
+ unpackRegex.append("|(?:").append(Pattern.quote(fname)).append(")");
}
+ config_.setPattern(JobContext.JAR_UNPACK_PATTERN,
+ Pattern.compile(unpackRegex.toString()));
validate(packageFiles_);
}
Added: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java?rev=890983&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java Tue Dec 15 20:55:58 2009
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.zip.GZIPOutputStream;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
+/**
+ * This class tests that the '-file' argument to streaming results
+ * in files being unpacked in the job working directory.
+ */
+public class TestFileArgs extends TestStreaming
+{
+ private MiniDFSCluster dfs = null;
+ private MiniMRCluster mr = null;
+ private FileSystem fileSys = null;
+ private String strJobTracker = null;
+ private String strNamenode = null;
+ private String namenode = null;
+ private Configuration conf = null;
+
+ private static final String EXPECTED_OUTPUT =
+ "job.jar\t\nsidefile\t\ntmp\t\n";
+
+ private static final String LS_PATH = "/bin/ls";
+
+ public TestFileArgs() throws IOException
+ {
+ // Set up mini cluster
+ conf = new Configuration();
+ dfs = new MiniDFSCluster(conf, 1, true, null);
+ fileSys = dfs.getFileSystem();
+ namenode = fileSys.getUri().getAuthority();
+ mr = new MiniMRCluster(1, namenode, 1);
+ strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.getJobTrackerPort();
+ strNamenode = "fs.default.name=hdfs://" + namenode;
+
+ FileSystem.setDefaultUri(conf, "hdfs://" + namenode);
+
+ // Set up side file
+ FileSystem localFs = FileSystem.getLocal(conf);
+ DataOutputStream dos = localFs.create(new Path("sidefile"));
+ dos.write("hello world\n".getBytes("UTF-8"));
+ dos.close();
+ }
+
+ @Override
+ protected String getExpectedOutput() {
+ return EXPECTED_OUTPUT;
+ }
+
+ @Override
+ protected Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ protected String[] genArgs() {
+ return new String[] {
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-file", new java.io.File("sidefile").getAbsolutePath(),
+ "-mapper", LS_PATH,
+ "-numReduceTasks", "0",
+ "-jobconf", strNamenode,
+ "-jobconf", strJobTracker,
+ "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
+ };
+ }
+
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestFileArgs().testCommandLine();
+ }
+
+}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Tue Dec 15 20:55:58 2009
@@ -22,6 +22,10 @@
import java.io.*;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+
/**
* This class tests hadoopStreaming in MapReduce local mode.
@@ -50,11 +54,15 @@
utilTest.redirectIfAntJunit();
}
+ protected String getInputData() {
+ return input;
+ }
+
protected void createInput() throws IOException
{
- DataOutputStream out = new DataOutputStream(
- new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
- out.write(input.getBytes("UTF-8"));
+ DataOutputStream out = getFileSystem().create(
+ new Path(INPUT_FILE.getAbsolutePath()));
+ out.write(getInputData().getBytes("UTF-8"));
out.close();
}
@@ -70,7 +78,29 @@
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
};
}
-
+
+ protected Configuration getConf() {
+ return new Configuration();
+ }
+
+ protected FileSystem getFileSystem() throws IOException {
+ return FileSystem.get(getConf());
+ }
+
+ protected String getExpectedOutput() {
+ return outputExpect;
+ }
+
+ protected void checkOutput() throws IOException {
+ Path outPath = new Path(OUTPUT_DIR.getAbsolutePath(), "part-00000");
+ FileSystem fs = getFileSystem();
+ String output = StreamUtil.slurpHadoop(outPath, fs);
+ fs.delete(outPath, true);
+ System.err.println("outEx1=" + getExpectedOutput());
+ System.err.println(" out1=" + output);
+ assertEquals(getExpectedOutput(), output);
+ }
+
public void testCommandLine() throws IOException
{
try {
@@ -84,14 +114,10 @@
// During tests, the default Configuration will use a local mapred
// So don't specify -config or -cluster
- job = new StreamJob(genArgs(), mayExit);
- job.go();
- File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
- String output = StreamUtil.slurp(outFile);
- outFile.delete();
- System.err.println("outEx1=" + outputExpect);
- System.err.println(" out1=" + output);
- assertEquals(outputExpect, output);
+ job = new StreamJob(genArgs(), mayExit);
+ int ret = job.go();
+ assertEquals(0, ret);
+ checkOutput();
} finally {
try {
INPUT_FILE.delete();
Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Tue Dec 15 20:55:58 2009
@@ -1507,9 +1507,11 @@
<li><code>${mapreduce.cluster.local.dir}/taskTracker/jobcache/$jobid/jars/</code>
: The jars directory, which has the job jar file and expanded jar.
The <code>job.jar</code> is the application's jar file that is
- automatically distributed to each machine. It is expanded in jars
- directory before the tasks for the job start. The job.jar location
- is accessible to the application through the api
+ automatically distributed to each machine. Any library jars that are dependencies
+ of the application code may be packaged inside this jar in a <code>lib/</code> directory.
+ This directory is extracted from <code>job.jar</code> and its contents are
+ automatically added to the classpath for each task.
+ The job.jar location is accessible to the application through the api
<a href="ext:api/org/apache/hadoop/mapred/jobconf/getjar">
JobConf.getJar() </a>. To access the unjarred directory,
JobConf.getJar().getParent() can be called.</li>
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Tue Dec 15 20:55:58 2009
@@ -24,6 +24,7 @@
import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -165,6 +166,10 @@
static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
JobContext.REDUCE_MEMORY_MB;
+ /** Pattern for the default unpacking behavior for job jars */
+ public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
+ Pattern.compile("(?:classes/|lib/).*");
+
/**
* Configuration key to set the java command line options for the child
* map and reduce tasks.
@@ -418,6 +423,14 @@
* @param jar the user jar for the map-reduce job.
*/
public void setJar(String jar) { set(JobContext.JAR, jar); }
+
+ /**
+ * Get the pattern for jar contents to unpack on the tasktracker
+ */
+ public Pattern getJarUnpackPattern() {
+ return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
+ }
+
/**
* Set the job's jar file by finding an example class location.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Dec 15 20:55:58 2009
@@ -649,7 +649,7 @@
}
}
classPaths.add(new File(jobCacheDir, "classes").toString());
- classPaths.add(jobCacheDir.toString());
+ classPaths.add(new File(jobCacheDir, "job.jar").toString());
}
/**
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Dec 15 20:55:58 2009
@@ -121,6 +121,7 @@
static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
"mapred.tasktracker.pmem.reserved";
+
static final long WAIT_FOR_DONE = 3 * 1000;
private int httpPort;
@@ -1008,10 +1009,11 @@
localJobConf.setJar(localJarFile.toString());
- // Also un-jar the job.jar files. We un-jar it so that classes inside
- // sub-directories, for e.g., lib/, classes/ are available on class-path
- RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
- .getParent().toString()));
+ // Un-jar the parts of the job.jar that need to be added to the classpath
+ RunJar.unJar(
+ new File(localJarFile.toString()),
+ new File(localJarFile.getParent().toString()),
+ localJobConf.getJarUnpackPattern());
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=890983&r1=890982&r2=890983&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Dec 15 20:55:58 2009
@@ -55,6 +55,7 @@
public static final String JAR = "mapreduce.job.jar";
public static final String ID = "mapreduce.job.id";
public static final String JOB_NAME = "mapreduce.job.name";
+ public static final String JAR_UNPACK_PATTERN = "mapreduce.job.jar.unpack.pattern";
public static final String USER_NAME = "mapreduce.job.user.name";
public static final String PRIORITY = "mapreduce.job.priority";
public static final String QUEUE_NAME = "mapreduce.job.queuename";