You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/01 22:38:18 UTC
svn commit: r534234 - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
Author: cutting
Date: Tue May 1 13:38:17 2007
New Revision: 534234
URL: http://svn.apache.org/viewvc?view=rev&rev=534234
Log:
HADOOP-1247. Add support to contrib/streaming for aggregate package. Contributed by Runping.
Added:
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534234&r1=534233&r2=534234
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue May 1 13:38:17 2007
@@ -297,6 +297,9 @@
88. HADOOP-1272. Extract inner classes from FSNamesystem into separate
classes. (Dhruba Borthakur via tomwhite)
+89. HADOOP-1247. Add support to contrib/streaming for aggregate
+ package, formerly called Abacus. (Runping Qi via cutting)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=534234&r1=534233&r2=534234
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue May 1 13:38:17 2007
@@ -26,13 +26,10 @@
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -49,10 +46,11 @@
import org.apache.commons.cli2.validation.Validator;
import org.apache.commons.logging.*;
+import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
+import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
@@ -66,7 +64,7 @@
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.filecache.*;
import org.apache.hadoop.util.*;
-import org.apache.log4j.helpers.OptionConverter;
+
/** All the client-side work happens here.
* (Jar packaging, MapRed job submission and monitoring)
* @author Michel Tourn
@@ -213,7 +211,6 @@
verbose_ = cmdLine.hasOption("-verbose");
detailedUsage_ = cmdLine.hasOption("-info");
debug_ = cmdLine.hasOption("-debug")? debug_ + 1 : debug_;
- inputTagged_ = cmdLine.hasOption("-inputtagged");
inputSpecs_.addAll(cmdLine.getValues("-input"));
output_ = (String) cmdLine.getValue("-output");
@@ -709,19 +706,14 @@
if (inReaderSpec_ == null && inputFormatSpec_ == null) {
fmt = KeyValueTextInputFormat.class;
} else if (inputFormatSpec_ != null) {
- if ((inputFormatSpec_.compareToIgnoreCase("KeyValueTextInputFormat") == 0)
- || (inputFormatSpec_
- .compareToIgnoreCase("org.apache.hadoop.mapred.KeyValueTextInputFormat") == 0)) {
+ if (inputFormatSpec_.equals(KeyValueTextInputFormat.class.getName())
+ || inputFormatSpec_.equals(KeyValueTextInputFormat.class.getCanonicalName())) {
fmt = KeyValueTextInputFormat.class;
- } else if ((inputFormatSpec_
- .compareToIgnoreCase("SequenceFileInputFormat") == 0)
- || (inputFormatSpec_
- .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileInputFormat") == 0)) {
+ } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class.getName())
+ || inputFormatSpec_.equals(SequenceFileInputFormat.class.getCanonicalName())) {
fmt = SequenceFileInputFormat.class;
- } else if ((inputFormatSpec_
- .compareToIgnoreCase("SequenceFileToLineInputFormat") == 0)
- || (inputFormatSpec_
- .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileToLineInputFormat") == 0)) {
+ } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getName())
+ || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getCanonicalName())) {
fmt = SequenceFileAsTextInputFormat.class;
} else {
c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage);
@@ -774,12 +766,19 @@
reducerNone_ = false;
if (redCmd_ != null) {
reducerNone_ = redCmd_.equals(REDUCE_NONE);
- c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
- if (c != null) {
- jobConf_.setReducerClass(c);
+ if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
+ jobConf_.setReducerClass(ValueAggregatorReducer.class);
+ jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
} else {
- jobConf_.setReducerClass(PipeReducer.class);
- jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(redCmd_, "UTF-8"));
+
+ c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
+ if (c != null) {
+ jobConf_.setReducerClass(c);
+ } else {
+ jobConf_.setReducerClass(PipeReducer.class);
+ jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
+ redCmd_, "UTF-8"));
+ }
}
}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java?view=auto&rev=534234
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java Tue May 1 13:38:17 2007
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.streaming.Environment;
+
+/**
+ Used to test the usage of external applications without adding
+ platform-specific dependencies.
+ */
+public class StreamAggregate extends TrApp
+{
+
+ public StreamAggregate()
+ {
+ super('.', ' ');
+ }
+
+ public void go() throws IOException
+ {
+ testParentJobConfToEnvVars();
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String line;
+
+ while ((line = in.readLine()) != null) {
+ String [] words = line.split(" ");
+ for (int i = 0; i< words.length; i++) {
+ String out = "LongValueSum:" + words[i].trim() + "\t" + "1";
+ System.out.println(out);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ TrApp app = new StreamAggregate();
+ app.go();
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java?view=auto&rev=534234
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java Tue May 1 13:38:17 2007
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests hadoopStreaming in MapReduce local mode.
+ * It uses Hadoop Aggregate to count the numbers of word occurrences
+ * in the input.
+ */
+public class TestStreamAggregate extends TestCase
+{
+ protected File INPUT_FILE = new File("stream_aggregate_input.txt");
+ protected File OUTPUT_DIR = new File("stream_aggregate_out");
+ protected String input = "roses are red\nviolets are blue\nbunnies are pink\n";
+ // map parses input lines and generates count entries for each word.
+ protected String map = StreamUtil.makeJavaCommand(StreamAggregate.class, new String[]{".", "\\n"});
+ // Use the aggregate combine, reducei to aggregate the counts
+ protected String outputExpect = "are\t3\nblue\t1\nbunnies\t1\npink\t1\nred\t1\nroses\t1\nviolets\t1\n";
+
+ private StreamJob job;
+
+ public TestStreamAggregate() throws IOException
+ {
+ UtilTest utilTest = new UtilTest(getClass().getName());
+ utilTest.checkUserDir();
+ utilTest.redirectIfAntJunit();
+ }
+
+ protected void createInput() throws IOException
+ {
+ DataOutputStream out = new DataOutputStream(
+ new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+ out.write(input.getBytes("UTF-8"));
+ out.close();
+ }
+
+ protected String[] genArgs() {
+ return new String[] {
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", map,
+ "-reducer", "aggregate",
+ //"-verbose",
+ //"-jobconf", "stream.debug=set"
+ "-jobconf", "keep.failed.task.files=true",
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+ };
+ }
+
+ public void testCommandLine()
+ {
+ try {
+ try {
+ OUTPUT_DIR.getAbsoluteFile().delete();
+ } catch (Exception e) {
+ }
+
+ createInput();
+ boolean mayExit = false;
+
+ // During tests, the default Configuration will use a local mapred
+ // So don't specify -config or -cluster
+ job = new StreamJob(genArgs(), mayExit);
+ job.go();
+ File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+ String output = StreamUtil.slurp(outFile);
+ outFile.delete();
+ System.err.println("outEx1=" + outputExpect);
+ System.err.println(" out1=" + output);
+ assertEquals(outputExpect, output);
+ } catch(Exception e) {
+ failTrace(e);
+ } finally {
+ File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
+ INPUT_FILE.delete();
+ outFileCRC.delete();
+ OUTPUT_DIR.getAbsoluteFile().delete();
+ }
+ }
+
+ private void failTrace(Exception e)
+ {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ fail(sw.toString());
+ }
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestStreaming().testCommandLine();
+ }
+
+}