You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/03/23 22:15:54 UTC
svn commit: r521906 - in /lucene/hadoop/trunk: ./
src/contrib/abacus/src/java/org/apache/hadoop/abacus/
src/contrib/data_join/ src/contrib/data_join/src/
src/contrib/data_join/src/java/ src/contrib/data_join/src/java/org/
src/contrib/data_join/src/java...
Author: cutting
Date: Fri Mar 23 14:15:53 2007
New Revision: 521906
URL: http://svn.apache.org/viewvc?view=rev&rev=521906
Log:
HADOOP-1120. Add contrib/data_join, tools to simplify joining data from multiple sources using MapReduce. Contributed by Runping Qi.
Added:
lucene/hadoop/trunk/src/contrib/data_join/
lucene/hadoop/trunk/src/contrib/data_join/build.xml
lucene/hadoop/trunk/src/contrib/data_join/src/
lucene/hadoop/trunk/src/contrib/data_join/src/java/
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/JobBase.java
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ResetableIterator.java
lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/TaggedMapOutput.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/build.xml
lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java
lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=521906&r1=521905&r2=521906
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Mar 23 14:15:53 2007
@@ -18,6 +18,9 @@
5. HADOOP-1116. Increase heap size used for contrib unit tests.
(Philippe Gassmann via cutting)
+ 6. HADOOP-1120. Add contrib/data_join, tools to simplify joining
+ data from multiple sources using MapReduce. (Runping Qi via cutting)
+
Release 0.12.2 - 2007-23-17
Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=521906&r1=521905&r2=521906
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Fri Mar 23 14:15:53 2007
@@ -535,6 +535,7 @@
<packageset dir="src/contrib/streaming/src/java"/>
<packageset dir="src/contrib/abacus/src/java"/>
+ <packageset dir="src/contrib/data_join/src/java"/>
<link href="${javadoc.link.java}"/>
<classpath refid="classpath"/>
@@ -544,6 +545,7 @@
<group title="contrib: Streaming" packages="org.apache.hadoop.streaming*"/>
<group title="contrib: Abacus" packages="org.apache.hadoop.abacus*"/>
+ <group title="contrib: DataJoin" packages="org.apache.hadoop.contrib/join*"/>
</javadoc>
</target>
Modified: lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java?view=diff&rev=521906&r1=521905&r2=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java (original)
+++ lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java Fri Mar 23 14:15:53 2007
@@ -18,6 +18,7 @@
package org.apache.hadoop.abacus;
+import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.Iterator;
@@ -37,9 +38,9 @@
public static final Log LOG = LogFactory.getLog("abacus.job");
- private TreeMap<Object, Long> longCounters = null;
+ private SortedMap<Object, Long> longCounters = null;
- private TreeMap<Object, Double> doubleCounters = null;
+ private SortedMap<Object, Double> doubleCounters = null;
/**
* Set the given counter to the given value
@@ -154,7 +155,7 @@
* the configuration
*/
public void configure(JobConf job) {
- this.longCounters = new TreeMap();
- this.doubleCounters = new TreeMap();
+ this.longCounters = new TreeMap<Object,Long> ();
+ this.doubleCounters = new TreeMap<Object,Double> ();
}
}
Modified: lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java?view=diff&rev=521906&r1=521905&r2=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java Fri Mar 23 14:15:53 2007
@@ -101,7 +101,7 @@
throws IOException {
if (args.length < 2) {
- System.out.println("usage: inputDirs outDir [numOfReducer [textinputformat|seq [specfile]]]");
+ System.out.println("usage: inputDirs outDir [numOfReducer [textinputformat|seq [specfile [jobName]]]]");
System.exit(1);
}
String inputDir = args[0];
@@ -122,12 +122,18 @@
specFile = new Path(args[4]);
}
+ String jobName = "";
+
+ if (args.length > 5) {
+ jobName = args[5];
+ }
+
JobConf theJob = new JobConf(ValueAggregatorJob.class);
if (specFile != null) {
theJob.addDefaultResource(specFile);
}
FileSystem fs = FileSystem.get(theJob);
- theJob.setJobName("ValueAggregatorJob");
+ theJob.setJobName("ValueAggregatorJob: " + jobName);
String[] inputDirsSpecs = inputDir.split(",");
for (int i = 0; i < inputDirsSpecs.length; i++) {
Added: lucene/hadoop/trunk/src/contrib/data_join/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/build.xml?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/build.xml (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/build.xml Fri Mar 23 14:15:53 2007
@@ -0,0 +1,23 @@
+<?xml version="1.0"?>
+
+<!--
+Before you can run these subtargets directly, you need
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="datajoin" default="jar">
+
+ <import file="../build-contrib.xml"/>
+
+ <!-- Override jar target to specify main class -->
+ <target name="jar" depends="compile">
+ <jar
+ jarfile="${build.dir}/hadoop-${name}.jar"
+ basedir="${build.classes}"
+ >
+ <manifest>
+ <attribute name="Main-Class" value="org.apache.hadoop.contrib.utils.join.DataJoinJob"/>
+ </manifest>
+ </jar>
+ </target>
+
+</project>
Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * This class provides an implementation of ResetableIterator. The
+ * implementation will be based on ArrayList.
+ *
+ * @author runping
+ *
+ */
+public class ArrayListBackedIterator implements ResetableIterator {
+
+ private Iterator iter;
+
+ private ArrayList data;
+
+ public ArrayListBackedIterator() {
+ this(new ArrayList());
+ }
+
+ public ArrayListBackedIterator(ArrayList data) {
+ this.data = data;
+ this.iter = this.data.iterator();
+ }
+
+ public void add(Object item) {
+ this.data.add(item);
+ }
+
+ public boolean hasNext() {
+ return this.iter.hasNext();
+ }
+
+ public Object next() {
+ return this.iter.next();
+ }
+
+ public void remove() {
+
+ }
+
+ public void reset() {
+ this.iter = this.data.iterator();
+ }
+
+ public void close() throws IOException {
+ this.iter = null;
+ this.data = null;
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+/**
+ * This class implements the main function for creating a map/reduce
+ * job to join data of different sources. To create sucn a job, the
+ * user must implement a mapper class that extends DataJoinMapperBase class,
+ * and a reducer class that extends DataJoinReducerBase.
+ *
+ * @author runping
+ *
+ */
+public class DataJoinJob {
+
+ public static Class getClassByName(String className) {
+ Class retv = null;
+ try {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ retv = Class.forName(className, true, classLoader);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return retv;
+ }
+
+ public static JobConf createDataJoinJob(String args[]) throws IOException {
+
+ String inputDir = args[0];
+ String outputDir = args[1];
+ int numOfReducers = Integer.parseInt(args[2]);
+ Class mapper = getClassByName(args[3]);
+ Class reducer = getClassByName(args[4]);
+ Class mapoutputValueClass = getClassByName(args[5]);
+ Class outputFormat = TextOutputFormat.class;
+ Class outputValueClass = Text.class;
+ if (args[6].compareToIgnoreCase("text") != 0) {
+ System.out.println("Using SequenceFileOutputFormat: " + args[6]);
+ outputFormat = SequenceFileOutputFormat.class;
+ outputValueClass = getClassByName(args[6]);
+ } else {
+ System.out.println("Using TextOutputFormat: " + args[6]);
+ }
+ long maxNumOfValuesPerGroup = 100;
+ String jobName = "";
+ if (args.length > 7) {
+ maxNumOfValuesPerGroup = Long.parseLong(args[7]);
+ }
+ if (args.length > 8) {
+ jobName = args[8];
+ }
+ Configuration defaults = new Configuration();
+ JobConf job = new JobConf(defaults, DataJoinJob.class);
+ job.setJobName("DataJoinJob: " + jobName);
+
+ FileSystem fs = FileSystem.get(defaults);
+ fs.delete(new Path(outputDir));
+ String[] inputDirsSpecs = inputDir.split(",");
+ for (int i = 0; i < inputDirsSpecs.length; i++) {
+ String spec = inputDirsSpecs[i];
+ job.addInputPath(new Path(spec));
+ }
+
+ job.setInputFormat(SequenceFileInputFormat.class);
+
+ job.setMapperClass(mapper);
+ job.setOutputPath(new Path(outputDir));
+ job.setOutputFormat(outputFormat);
+ SequenceFile.setCompressionType(job,
+ SequenceFile.CompressionType.BLOCK);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(mapoutputValueClass);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(outputValueClass);
+ job.setReducerClass(reducer);
+
+ job.setNumMapTasks(1);
+ job.setNumReduceTasks(numOfReducers);
+ job.setLong("ultjoin.maxNumOfValuesPerGroup",
+ maxNumOfValuesPerGroup);
+ job.set("mapred.child.java.opts", "-Xmx1024m");
+ job.setKeepFailedTaskFiles(true);
+ return job;
+ }
+
+ /**
+ * Submit/run a map/reduce job.
+ *
+ * @param job
+ * @return true for success
+ * @throws IOException
+ */
+ public static boolean runJob(JobConf job) throws IOException {
+ JobClient jc = new JobClient(job);
+ boolean sucess = true;
+ RunningJob running = null;
+ try {
+ running = jc.submitJob(job);
+ String jobId = running.getJobID();
+ System.out.println("Job " + jobId + " is submitted");
+ while (!running.isComplete()) {
+ System.out.println("Job " + jobId + " is still running.");
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ }
+ running = jc.getJob(jobId);
+ }
+ sucess = running.isSuccessful();
+ } finally {
+ if (!sucess && (running != null)) {
+ running.killJob();
+ }
+ jc.close();
+ }
+ return sucess;
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ boolean success;
+ if (args.length < 7 || args.length > 9) {
+ System.out.println("usage: DataJoinJob " + "inputdirs outputdir "
+ + "numofParts " + "mapper_class " + "reducer_class "
+ + "map_output_value_class "
+ + "output_value_class [maxNumOfValuesPerGroup [descriptionOfJob]]]");
+ System.exit(-1);
+ }
+
+ try {
+ JobConf job = DataJoinJob.createDataJoinJob(args);
+ success = DataJoinJob.runJob(job);
+ if (!success) {
+ System.out.println("Job failed");
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This abstract class serves as the base class for the mapper class of a data
+ * join job. This class expects its subclasses to implement methods for the
+ * following functionalities:
+ *
+ * 1. Compute the source tag of input values 2. Compute the map output value
+ * object 3. Compute the map output key object
+ *
+ * The source tag will be used by the reducer to determine from which source
+ * (which table in SQL terminology) a value comes. Computing the map output
+ * value object amounts to performing projecting/filtering work in a SQL
+ * statement (through the select/where clauses). Computing the map output key
+ * amounts to choosing the join key. This class provides the appropriate plugin
+ * points for the user defined subclasses to implement the appropriate logic.
+ *
+ */
+public abstract class DataJoinMapperBase extends JobBase {
+
+ protected String inputFile = null;
+
+ protected JobConf job = null;
+
+ protected Text inputTag = null;
+
+ protected Reporter reporter = null;
+
+ public void configure(JobConf job) {
+ super.configure(job);
+ this.job = job;
+ this.inputFile = job.get("map.input.file");
+ this.inputTag = generateInputTag(this.inputFile);
+ }
+
+ /**
+ * Determine the source tag based on the input file name.
+ *
+ * @param inputFile
+ * @return the source tag computed from the given file name.
+ */
+ protected abstract Text generateInputTag(String inputFile);
+
+ /**
+ * Generate a tagged map output value. The user code can also perform
+ * projection/filtering. If it decides to discard the input record when
+ * certain conditions are met,it can simply return a null.
+ *
+ * @param value
+ * @return an object of TaggedMapOutput computed from the given value.
+ */
+ protected abstract TaggedMapOutput generateTaggedMapOutput(Writable value);
+
+ /**
+ * Generate a map output key. The user code can compute the key
+ * programmatically, not just selecting the values of some fields. In this
+ * sense, it is more general than the joining capabilities of SQL.
+ *
+ * @param aRecord
+ * @return the group key for the given record
+ */
+ protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector output, Reporter reporter) throws IOException {
+ if (this.reporter == null) {
+ this.reporter = reporter;
+ }
+ addLongValue("totalCount", 1);
+ TaggedMapOutput aRecord = generateTaggedMapOutput(value);
+ if (aRecord == null) {
+ addLongValue("discardedCount", 1);
+ return;
+ }
+ Text groupKey = generateGroupKey(aRecord);
+ if (groupKey == null) {
+ addLongValue("nullGroupKeyCount", 1);
+ return;
+ }
+ output.collect(groupKey, aRecord);
+ addLongValue("collectedCount", 1);
+ }
+
+ public void close() throws IOException {
+ if (this.reporter != null) {
+ this.reporter.setStatus(super.getReport());
+ }
+ }
+
+ public void reduce(WritableComparable arg0, Iterator arg1,
+ OutputCollector arg2, Reporter arg3) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This abstract class serves as the base class for the reducer class of a data
+ * join job. The reduce function will first group the values according to their
+ * input tags, and then compute the cross product of over the groups. For each
+ * tuple in the cross product, it calls the following method, which is expected
+ * to be implemented in a subclass.
+ *
+ * protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
+ *
+ * The above method is expected to produce one output value from an array of
+ * records of different sources. The user code can also perform filtering here.
+ * It can return null if it decides to the records do not meet certain
+ * conditions.
+ *
+ */
+public abstract class DataJoinReducerBase extends JobBase {
+
+ protected Reporter reporter = null;
+
+ private long maxNumOfValuesPerGroup = 100;
+
+ protected long largestNumOfValues = 0;
+
+ protected long numOfValues = 0;
+
+ protected long collected = 0;
+
+ protected JobConf job;
+
+ public void close() throws IOException {
+ if (this.reporter != null) {
+ this.reporter.setStatus(super.getReport());
+ }
+ }
+
+ public void configure(JobConf job) {
+ super.configure(job);
+ this.job = job;
+ this.maxNumOfValuesPerGroup = job.getLong("ultjoin.maxNumOfValuesPerGroup",
+ 100);
+ }
+
+ /**
+ * The subclass can provide a different implementation on ResetableIterator.
+ * This is necessary if the number of values in a reduce call is very high.
+ *
+ * The default provided here uses ArrayListBackedIterator
+ *
+ * @return an Object of ResetableIterator.
+ */
+ protected ResetableIterator createResetableIterator() {
+ return new ArrayListBackedIterator();
+ }
+
+ /**
+ * This is the function that re-groups values for a key into sub-groups based
+ * on a secondary key (input tag).
+ *
+ * @param arg1
+ * @return
+ */
+ private SortedMap<Object, ResetableIterator> regroup(Writable key,
+ Iterator arg1, Reporter reporter) throws IOException {
+ this.numOfValues = 0;
+ SortedMap<Object, ResetableIterator> retv = new TreeMap<Object, ResetableIterator>();
+ TaggedMapOutput aRecord = null;
+ while (arg1.hasNext()) {
+ aRecord = (TaggedMapOutput) arg1.next();
+ this.numOfValues += 1;
+ if (this.numOfValues % 100 == 0) {
+ reporter.setStatus("key: " + key.toString() + " numOfValues: "
+ + this.numOfValues);
+ }
+ if (this.numOfValues > this.maxNumOfValuesPerGroup) {
+ continue;
+ }
+ Object tag = aRecord.getTag();
+ ResetableIterator data = retv.get(tag);
+ if (data == null) {
+ data = createResetableIterator();
+ retv.put(tag, data);
+ }
+ data.add(aRecord);
+ }
+ if (this.numOfValues > this.largestNumOfValues) {
+ this.largestNumOfValues = numOfValues;
+ LOG.info("key: " + key.toString() + " this.largestNumOfValues: "
+ + this.largestNumOfValues);
+ }
+ return retv;
+ }
+
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter) throws IOException {
+ if (this.reporter == null) {
+ this.reporter = reporter;
+ }
+
+ SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);
+ Object[] tags = groups.keySet().toArray();
+ ResetableIterator[] groupValues = new ResetableIterator[tags.length];
+ for (int i = 0; i < tags.length; i++) {
+ groupValues[i] = groups.get(tags[i]);
+ }
+ joinAndCollect(tags, groupValues, key, output, reporter);
+ addLongValue("groupCount", 1);
+ for (int i = 0; i < tags.length; i++) {
+ groupValues[i].close();
+ }
+ }
+
+ /**
+ * The subclass can overwrite this method to perform additional filtering
+ * and/or other processing logic before a value is collected.
+ *
+ * @param key
+ * @param aRecord
+ * @param output
+ * @param reporter
+ * @throws IOException
+ */
+ protected void collect(WritableComparable key, TaggedMapOutput aRecord,
+ OutputCollector output, Reporter reporter) throws IOException {
+ this.collected += 1;
+ addLongValue("collectedCount", 1);
+ if (aRecord != null && this.collected % 1 == 0) {
+ output.collect(key, aRecord.getData());
+ reporter.setStatus("key: " + key.toString() + " collected: " + collected);
+ addLongValue("actuallyCollectedCount", 1);
+ }
+ }
+
+ /**
+ * join the list of the value lists, and collect the results.
+ *
+ * @param tags
+ * a list of input tags
+ * @param values
+ * a list of value lists, each corresponding to one input source
+ * @param key
+ * @param output
+ * @throws IOException
+ */
+ private void joinAndCollect(Object[] tags, ResetableIterator[] values,
+ WritableComparable key, OutputCollector output, Reporter reporter)
+ throws IOException {
+ if (values.length < 1) {
+ return;
+ }
+ Object[] partialList = new Object[values.length];
+ joinAndCollect(tags, values, 0, partialList, key, output, reporter);
+ }
+
+ /**
+ * Perform the actual join recursively.
+ *
+ * @param tags
+ * a list of input tags
+ * @param values
+ * a list of value lists, each corresponding to one input source
+ * @param pos
+ * indicating the next value list to be joined
+ * @param partialList
+ * a list of values, each from one value list considered so far.
+ * @param key
+ * @param output
+ * @throws IOException
+ */
+ private void joinAndCollect(Object[] tags, ResetableIterator[] values,
+ int pos, Object[] partialList, WritableComparable key,
+ OutputCollector output, Reporter reporter) throws IOException {
+
+ if (values.length == pos) {
+ // get a value from each source. Combine them
+ TaggedMapOutput combined = combine(tags, partialList);
+ collect(key, combined, output, reporter);
+ return;
+ }
+ ResetableIterator nextValues = values[pos];
+ nextValues.reset();
+ while (nextValues.hasNext()) {
+ Object v = nextValues.next();
+ partialList[pos] = v;
+ joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter);
+ }
+ }
+
+ public static Text SOURCE_TAGS_FIELD = new Text("SOURCE_TAGS");
+
+ public static Text NUM_OF_VALUES_FIELD = new Text("NUM_OF_VALUES");
+
+ /**
+ *
+ * @param tags
+ * a list of source tags
+ * @param values
+ * a value per source
+ * @return combined value derived from values of the sources
+ */
+ protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
+
+ public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+ Reporter arg3) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/JobBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/JobBase.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/JobBase.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/JobBase.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+ * A common base implementing some statics collecting mechanisms that are
+ * commonly used in a typical map/reduce job.
+ *
+ */
+public abstract class JobBase implements Mapper, Reducer {
+
+ public static final Log LOG = LogFactory.getLog("datajoin.job");
+
+ private SortedMap<Object, Long> longCounters = null;
+
+ private SortedMap<Object, Double> doubleCounters = null;
+
+ /**
+ * Set the given counter to the given value
+ *
+ * @param name
+ * the counter name
+ * @param value
+ * the value for the counter
+ */
+ protected void setLongValue(Object name, long value) {
+ this.longCounters.put(name, new Long(value));
+ }
+
+ /**
+ * Set the given counter to the given value
+ *
+ * @param name
+ * the counter name
+ * @param value
+ * the value for the counter
+ */
+ protected void setDoubleValue(Object name, double value) {
+ this.doubleCounters.put(name, new Double(value));
+ }
+
+ /**
+ *
+ * @param name
+ * the counter name
+ * @return return the value of the given counter.
+ */
+ protected Long getLongValue(Object name) {
+ return this.longCounters.get(name);
+ }
+
+ /**
+ *
+ * @param name
+ * the counter name
+ * @return return the value of the given counter.
+ */
+ protected Double getDoubleValue(Object name) {
+ return this.doubleCounters.get(name);
+ }
+
+ /**
+ * Increment the given counter by the given incremental value If the counter
+ * does not exist, one is created with value 0.
+ *
+ * @param name
+ * the counter name
+ * @param inc
+ * the incremental value
+ * @return the updated value.
+ */
+ protected Long addLongValue(Object name, long inc) {
+ Long val = this.longCounters.get(name);
+ Long retv = null;
+ if (val == null) {
+ retv = new Long(inc);
+ } else {
+ retv = new Long(val.longValue() + inc);
+ }
+ this.longCounters.put(name, retv);
+ return retv;
+ }
+
+ /**
+ * Increment the given counter by the given incremental value If the counter
+ * does not exist, one is created with value 0.
+ *
+ * @param name
+ * the counter name
+ * @param inc
+ * the incremental value
+ * @return the updated value.
+ */
+ protected Double addDoubleValue(Object name, double inc) {
+ Double val = this.doubleCounters.get(name);
+ Double retv = null;
+ if (val == null) {
+ retv = new Double(inc);
+ } else {
+ retv = new Double(val.doubleValue() + inc);
+ }
+ this.doubleCounters.put(name, retv);
+ return retv;
+ }
+
+ /**
+ * log the counters
+ *
+ */
+ protected void report() {
+ LOG.info(getReport());
+ }
+
+ /**
+ * log the counters
+ *
+ */
+ protected String getReport() {
+ StringBuffer sb = new StringBuffer();
+
+ Iterator iter = this.longCounters.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry e = (Entry) iter.next();
+ sb.append(e.getKey().toString()).append("\t").append(e.getValue())
+ .append("\n");
+ }
+ iter = this.doubleCounters.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry e = (Entry) iter.next();
+ sb.append(e.getKey().toString()).append("\t").append(e.getValue())
+ .append("\n");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Initializes a new instance from a {@link JobConf}.
+ *
+ * @param job
+ * the configuration
+ */
+ public void configure(JobConf job) {
+ this.longCounters = new TreeMap<Object, Long>();
+ this.doubleCounters = new TreeMap<Object, Double>();
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ResetableIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ResetableIterator.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ResetableIterator.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ResetableIterator.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * This interface defines an iterator interface that will help the reducer class
+ * for re-grouping the values in the values iterator of the reduce method
+ * according the their source tags. Once the value re-grouped, the reducer can
+ * perform the cross product over the values in different groups.
+ *
+ * @author runping
+ *
+ */
+public interface ResetableIterator extends Iterator {
+ public void reset();
+
+ public void add(Object item);
+
+ public void close() throws IOException;
+}
Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/TaggedMapOutput.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/TaggedMapOutput.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/TaggedMapOutput.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/TaggedMapOutput.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This abstract class serves as the base class for the values that
+ * flow from the mappers to the reducers in a data join job.
+ * Typically, in such a job, the mappers will compute the source
+ * tag of an input record based on its attributes or based on the
+ * file name of the input file. This tag will be used by the reducers
+ * to re-group the values of a given key according to their source tags.
+ *
+ * @author runping
+ *
+ */
+public abstract class TaggedMapOutput implements Writable {
+ protected Text tag;
+
+ public TaggedMapOutput() {
+ this.tag = new Text("");
+ }
+
+ public Text getTag() {
+ return tag;
+ }
+
+ public void setTag(Text tag) {
+ this.tag = tag;
+ }
+
+ public abstract Writable getData();
+
+}