You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:23:51 UTC

svn commit: r1076935 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/streaming/src/java/org/apache/hadoop/streaming/ contrib/streaming/src/test/org/apache/hadoop/streaming/ docs/src/documentation/content/xdocs/

Author: omalley
Date: Fri Mar  4 03:23:50 2011
New Revision: 1076935

URL: http://svn.apache.org/viewvc?rev=1076935&view=rev
Log:
commit c26d7d9df5b469722b2d154320e1f49e734e9b43
Author: Lee Tucker <lt...@yahoo-inc.com>
Date:   Thu Jul 30 17:40:20 2009 -0700

    Applying patch 2417466.4842.patch

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/streaming.xml

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java?rev=1076935&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java Fri Mar  4 03:23:50 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+
+import org.apache.hadoop.mapred.JobConf;
+
+public class PipeCombiner extends PipeReducer {
+  String getPipeCommand(JobConf job) {
+    String str = job.get("stream.combine.streamprocessor");
+    try {
+      if (str != null) {
+        return URLDecoder.decode(str, "UTF-8");
+      }
+    } catch (UnsupportedEncodingException e) {
+      System.err.println("stream.combine.streamprocessor" + 
+                         " in jobconf not found");
+    }
+    return null;
+  }
+  boolean getDoPipe() {
+    return (getPipeCommand(job_) != null);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=1076935&r1=1076934&r2=1076935&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Fri Mar  4 03:23:50 2011
@@ -453,7 +453,8 @@ public class StreamJob implements Tool {
     System.out.println("  -input    <path>     DFS input file(s) for the Map step");
     System.out.println("  -output   <path>     DFS output directory for the Reduce step");
     System.out.println("  -mapper   <cmd|JavaClassName>      The streaming command to run");
-    System.out.println("  -combiner <JavaClassName> Combiner has to be a Java class");
+    System.out.println("  -combiner <cmd|JavaClassName>" + 
+                       " The streaming command to run");
     System.out.println("  -reducer  <cmd|JavaClassName>      The streaming command to run");
     System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
     System.out.println("  -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.");
@@ -710,7 +711,9 @@ public class StreamJob implements Tool {
       if (c != null) {
         jobConf_.setCombinerClass(c);
       } else {
-        fail("-combiner : class not found : " + comCmd_);
+        jobConf_.setCombinerClass(PipeCombiner.class);
+        jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(
+                comCmd_, "UTF-8"));
       }
     }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=1076935&r1=1076934&r2=1076935&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Fri Mar  4 03:23:50 2011
@@ -39,7 +39,7 @@ public class TestStreaming extends TestC
   protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
   protected String outputExpect = "Rare\t\nRblue\t\nRbunnies\t\nRpink\t\nRred\t\nRroses\t\nRviolets\t\n";
 
-  private StreamJob job;
+  protected StreamJob job;
 
   public TestStreaming() throws IOException
   {

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java?rev=1076935&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java Fri Mar  4 03:23:50 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.Counters;
+
+public class TestStreamingCombiner extends TestStreaming {
+
+  protected String combine = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{""});
+  
+  public TestStreamingCombiner() throws IOException {
+    super();
+  }
+  
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", reduce,
+      "-combiner", combine,
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+
+  public void testCommandLine() throws IOException {
+    super.testCommandLine();
+    // validate combiner counters
+    String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
+    Counters counters = job.running_.getCounters();
+    assertTrue(counters.findCounter(
+               counterGrp, "COMBINE_INPUT_RECORDS").getValue() != 0);
+    assertTrue(counters.findCounter(
+               counterGrp, "COMBINE_OUTPUT_RECORDS").getValue() != 0);
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreamingCombiner().testCommandLine();
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/streaming.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/streaming.xml?rev=1076935&r1=1076934&r2=1076935&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/streaming.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/streaming.xml Fri Mar  4 03:23:50 2011
@@ -124,7 +124,7 @@ Just as with a normal Map/Reduce job, yo
    -inputformat JavaClassName
    -outputformat JavaClassName
    -partitioner JavaClassName
-   -combiner JavaClassName
+   -combiner streamingCommand or JavaClassName
 </source>
 <p>
 The class you supply for the input format should return key/value pairs of Text class. If you do not specify an input format class, the TextInputFormat is used as the default. Since the TextInputFormat returns keys of LongWritable class, which are actually not part of the input data, the keys will be discarded; only the values will be piped to the streaming mapper.