You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/01 22:38:18 UTC

svn commit: r534234 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/

Author: cutting
Date: Tue May  1 13:38:17 2007
New Revision: 534234

URL: http://svn.apache.org/viewvc?view=rev&rev=534234
Log:
HADOOP-1247.  Add support to contrib/streaming for aggregate package.  Contributed by Runping.

Added:
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534234&r1=534233&r2=534234
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue May  1 13:38:17 2007
@@ -297,6 +297,9 @@
 88. HADOOP-1272.  Extract inner classes from FSNamesystem into separate 
     classes.  (Dhruba Borthakur via tomwhite)
 
+89. HADOOP-1247.  Add support to contrib/streaming for aggregate
+    package, formerly called Abacus.  (Runping Qi via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=534234&r1=534233&r2=534234
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue May  1 13:38:17 2007
@@ -26,13 +26,10 @@
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
@@ -49,10 +46,11 @@
 import org.apache.commons.cli2.validation.Validator;
 import org.apache.commons.logging.*;
 
+import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
+import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
@@ -66,7 +64,7 @@
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
-import org.apache.log4j.helpers.OptionConverter;
+
 /** All the client-side work happens here.
  * (Jar packaging, MapRed job submission and monitoring)
  * @author Michel Tourn
@@ -213,7 +211,6 @@
       verbose_ =  cmdLine.hasOption("-verbose");
       detailedUsage_ = cmdLine.hasOption("-info");
       debug_ = cmdLine.hasOption("-debug")? debug_ + 1 : debug_;
-      inputTagged_ = cmdLine.hasOption("-inputtagged"); 
       
       inputSpecs_.addAll(cmdLine.getValues("-input"));
       output_ = (String) cmdLine.getValue("-output"); 
@@ -709,19 +706,14 @@
     if (inReaderSpec_ == null && inputFormatSpec_ == null) {
       fmt = KeyValueTextInputFormat.class;
     } else if (inputFormatSpec_ != null) {
-      if ((inputFormatSpec_.compareToIgnoreCase("KeyValueTextInputFormat") == 0)
-          || (inputFormatSpec_
-              .compareToIgnoreCase("org.apache.hadoop.mapred.KeyValueTextInputFormat") == 0)) {
+      if (inputFormatSpec_.equals(KeyValueTextInputFormat.class.getName())
+          || inputFormatSpec_.equals(KeyValueTextInputFormat.class.getCanonicalName())) {
         fmt = KeyValueTextInputFormat.class;
-      } else if ((inputFormatSpec_
-                  .compareToIgnoreCase("SequenceFileInputFormat") == 0)
-                 || (inputFormatSpec_
-                     .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileInputFormat") == 0)) {
+      } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class.getName())
+          || inputFormatSpec_.equals(SequenceFileInputFormat.class.getCanonicalName())) {
         fmt = SequenceFileInputFormat.class;
-      } else if ((inputFormatSpec_
-                  .compareToIgnoreCase("SequenceFileToLineInputFormat") == 0)
-                 || (inputFormatSpec_
-                     .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileToLineInputFormat") == 0)) {
+      } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getName())
+          || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getCanonicalName())) {
         fmt = SequenceFileAsTextInputFormat.class;
       } else {
         c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage);
@@ -774,12 +766,19 @@
     reducerNone_ = false;
     if (redCmd_ != null) {
       reducerNone_ = redCmd_.equals(REDUCE_NONE);
-      c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
-      if (c != null) {
-        jobConf_.setReducerClass(c);
+      if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
+        jobConf_.setReducerClass(ValueAggregatorReducer.class);
+        jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
       } else {
-        jobConf_.setReducerClass(PipeReducer.class);
-        jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(redCmd_, "UTF-8"));
+
+        c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
+        if (c != null) {
+          jobConf_.setReducerClass(c);
+        } else {
+          jobConf_.setReducerClass(PipeReducer.class);
+          jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
+              redCmd_, "UTF-8"));
+        }
       }
     }
 

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java?view=auto&rev=534234
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java Tue May  1 13:38:17 2007
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.streaming.Environment;
+
+/** 
+    Used to test the usage of external applications without adding
+    platform-specific dependencies.
+ */
+public class StreamAggregate extends TrApp
+{
+
+  public StreamAggregate()
+  {
+    super('.', ' ');
+  }
+
+  public void go() throws IOException
+  {
+    testParentJobConfToEnvVars();
+    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+    String line;
+
+    while ((line = in.readLine()) != null) {
+      String [] words = line.split(" ");
+      for (int i = 0; i< words.length; i++) {
+        String out = "LongValueSum:" + words[i].trim() + "\t" + "1";
+        System.out.println(out);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws IOException
+  {
+    TrApp app = new StreamAggregate();
+    app.go();
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java?view=auto&rev=534234
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java Tue May  1 13:38:17 2007
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests hadoopStreaming in MapReduce local mode.
+ * It uses Hadoop Aggregate to count the numbers of word occurrences 
+ * in the input.
+ */
+public class TestStreamAggregate extends TestCase
+{
+  protected File INPUT_FILE = new File("stream_aggregate_input.txt");
+  protected File OUTPUT_DIR = new File("stream_aggregate_out");
+  protected String input = "roses are red\nviolets are blue\nbunnies are pink\n";
+  // map parses input lines and generates count entries for each word.
+  protected String map = StreamUtil.makeJavaCommand(StreamAggregate.class, new String[]{".", "\\n"});
+  // Use the aggregate combine, reducei to aggregate the counts
+  protected String outputExpect = "are\t3\nblue\t1\nbunnies\t1\npink\t1\nred\t1\nroses\t1\nviolets\t1\n";
+
+  private StreamJob job;
+
+  public TestStreamAggregate() throws IOException
+  {
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    utilTest.redirectIfAntJunit();
+  }
+
+  protected void createInput() throws IOException
+  {
+    DataOutputStream out = new DataOutputStream(
+                                                new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+    out.write(input.getBytes("UTF-8"));
+    out.close();
+  }
+
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", "aggregate",
+      //"-verbose",
+      //"-jobconf", "stream.debug=set"
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+  
+  public void testCommandLine()
+  {
+    try {
+      try {
+        OUTPUT_DIR.getAbsoluteFile().delete();
+      } catch (Exception e) {
+      }
+
+      createInput();
+      boolean mayExit = false;
+
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster
+      job = new StreamJob(genArgs(), mayExit);      
+      job.go();
+      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+      String output = StreamUtil.slurp(outFile);
+      outFile.delete();
+      System.err.println("outEx1=" + outputExpect);
+      System.err.println("  out1=" + output);
+      assertEquals(outputExpect, output);
+    } catch(Exception e) {
+      failTrace(e);
+    } finally {
+      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
+      INPUT_FILE.delete();
+      outFileCRC.delete();
+      OUTPUT_DIR.getAbsoluteFile().delete();
+    }
+  }
+
+  private void failTrace(Exception e)
+  {
+    StringWriter sw = new StringWriter();
+    e.printStackTrace(new PrintWriter(sw));
+    fail(sw.toString());
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreaming().testCommandLine();
+  }
+
+}