You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/09/28 10:18:49 UTC
svn commit: r1002050 - in /hadoop/mapreduce/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/docs/src/documentation/content/xdocs/
Author: amareshwari
Date: Tue Sep 28 08:18:49 2010
New Revision: 1002050
URL: http://svn.apache.org/viewvc?rev=1002050&view=rev
Log:
MAPREDUCE-1517. Supports streaming job to run in the background. Contributed by Bochun Bai
Added:
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/streaming.xml
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1002050&r1=1002049&r2=1002050&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Sep 28 08:18:49 2010
@@ -130,6 +130,9 @@ Trunk (unreleased changes)
MAPREDUCE-1548. Hadoop archives preserve times and other properties from
original files. (Rodrigo Schmidt via dhruba)
+ MAPREDUCE-1517. Supports streaming job to run in the background. (Bochun Bai
+ via amareshwari)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=1002050&r1=1002049&r2=1002050&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Sep 28 08:18:49 2010
@@ -255,6 +255,7 @@ public class StreamJob implements Tool {
return;
}
verbose_ = cmdLine.hasOption("verbose");
+ background_ = cmdLine.hasOption("background");
debug_ = cmdLine.hasOption("debug")? debug_ + 1 : debug_;
String[] values = cmdLine.getOptionValues("input");
@@ -432,6 +433,7 @@ public class StreamJob implements Tool {
// boolean properties
+ Option background = createBoolOption("background", "Submit the job and don't wait till it completes.");
Option verbose = createBoolOption("verbose", "print verbose output");
Option info = createBoolOption("info", "print verbose output");
Option help = createBoolOption("help", "print this help message");
@@ -459,6 +461,7 @@ public class StreamJob implements Tool {
addOption(cacheFile).
addOption(cacheArchive).
addOption(io).
+ addOption(background).
addOption(verbose).
addOption(info).
addOption(debug).
@@ -510,6 +513,7 @@ public class StreamJob implements Tool {
+ " for input to and output");
System.out.println(" from mapper/reducer commands");
System.out.println(" -lazyOutput Optional. Lazily create Output.");
+ System.out.println(" -background Optional. Submit the job and don't wait till it completes.");
System.out.println(" -verbose Optional. Print verbose output.");
System.out.println(" -info Optional. Print detailed usage.");
System.out.println(" -help Optional. Print help message.");
@@ -997,7 +1001,9 @@ public class StreamJob implements Tool {
running_ = jc_.submitJob(jobConf_);
jobId_ = running_.getID();
jobInfo();
- if (!jc_.monitorAndPrintJob(jobConf_, running_)) {
+ if (background_) {
+ LOG.info("Job is running in background.");
+ } else if (!jc_.monitorAndPrintJob(jobConf_, running_)) {
LOG.error("Job not Successful!");
return 1;
}
@@ -1025,6 +1031,7 @@ public class StreamJob implements Tool {
}
protected String[] argv_;
+ protected boolean background_;
protected boolean verbose_;
protected boolean detailedUsage_;
protected boolean printUsage = false;
Added: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java?rev=1002050&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java Tue Sep 28 08:18:49 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+/**
+ * A simple Java app that will consume all input from stdin, wait a few seconds
+ * and echoing it to stdout.
+ */
+public class DelayEchoApp {
+
+ public DelayEchoApp() {
+ }
+
+ public void go(int seconds) throws IOException, InterruptedException {
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String line;
+
+ // Consume all input (to make sure streaming will still count this
+ // task as failed even if all input was consumed).
+ while ((line = in.readLine()) != null) {
+ Thread.sleep(seconds * 1000L);
+ System.out.println(line);
+ }
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ int seconds = 5;
+ if (args.length >= 1) {
+ try {
+ seconds = Integer.valueOf(args[0]);
+ } catch (NumberFormatException e) {
+ // just use default 5.
+ }
+ }
+
+ DelayEchoApp app = new DelayEchoApp();
+ app.go(seconds);
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java?rev=1002050&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java Tue Sep 28 08:18:49 2010
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import org.junit.Test;
+import org.junit.Before;
+import static org.junit.Assert.*;
+
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests if hadoopStreaming background works fine. A DelayEchoApp
+ * with 10 seconds delay is submited.
+ */
+public class TestStreamingBackground {
+ protected File TEST_DIR = new File("TestStreamingBackground")
+ .getAbsoluteFile();
+ protected File INPUT_FILE = new File(TEST_DIR, "input.txt");
+ protected File OUTPUT_DIR = new File(TEST_DIR, "out");
+
+ protected String tenSecondsTask = StreamUtil.makeJavaCommand(
+ DelayEchoApp.class, new String[] { "10" });
+
+ public TestStreamingBackground() throws IOException {
+ UtilTest utilTest = new UtilTest(getClass().getName());
+ utilTest.checkUserDir();
+ utilTest.redirectIfAntJunit();
+ }
+
+ protected String[] args = new String[] {
+ "-background",
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", tenSecondsTask,
+ "-reducer", tenSecondsTask,
+ "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data", "/tmp"),
+ "-jobconf", "mapreduce.task.io.sort.mb=10"
+ };
+
+ @Before
+ public void setUp() throws IOException {
+ UtilTest.recursiveDelete(TEST_DIR);
+ assertTrue(TEST_DIR.mkdirs());
+
+ FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile());
+ out.write("hello\n".getBytes());
+ out.close();
+ }
+
+ public void runStreamJob() throws Exception {
+ boolean mayExit = false;
+ int returnStatus = 0;
+
+ StreamJob job = new StreamJob(args, mayExit);
+ returnStatus = job.go();
+
+ assertEquals("Streaming Job expected to succeed", 0, returnStatus);
+ job.running_.killJob();
+ job.running_.waitForCompletion();
+ }
+
+ @Test
+ public void testBackgroundSubmitOk() throws Exception {
+ runStreamJob();
+ }
+
+}
Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/streaming.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/streaming.xml?rev=1002050&r1=1002049&r2=1002050&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/streaming.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/streaming.xml Tue Sep 28 08:18:49 2010
@@ -97,6 +97,7 @@ For an example, see <a href="streaming.h
<tr><td> -combiner streamingCommand or JavaClassName</td><td> Optional </td><td> Combiner executable for map output</td></tr>
<tr><td> -cmdenv name=value</td><td> Optional </td><td> Pass environment variable to streaming commands</td></tr>
<tr><td> -inputreader spec</td><td> Optional </td><td> Specifies a record reader class (instead of an input format class)</td></tr>
+<tr><td> -background</td><td> Optional </td><td> Submit the job and don't wait till it completes.</td></tr>
<tr><td> -verbose</td><td> Optional </td><td> Verbose output</td></tr>
<tr><td> -lazyOutput</td><td> Optional </td><td> Create output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write)</td></tr>
<tr><td> -numReduceTasks num</td><td> Optional </td><td> Specify the number of reducers</td></tr>