You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/03/23 22:15:54 UTC

svn commit: r521906 - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/ src/contrib/data_join/src/ src/contrib/data_join/src/java/ src/contrib/data_join/src/java/org/ src/contrib/data_join/src/java...

Author: cutting
Date: Fri Mar 23 14:15:53 2007
New Revision: 521906

URL: http://svn.apache.org/viewvc?view=rev&rev=521906
Log:
HADOOP-1120.  Add contrib/data_join, tools to simplify joining data from multiple sources using MapReduce.  Contributed by Runping Qi.

Added:
    lucene/hadoop/trunk/src/contrib/data_join/
    lucene/hadoop/trunk/src/contrib/data_join/build.xml
    lucene/hadoop/trunk/src/contrib/data_join/src/
    lucene/hadoop/trunk/src/contrib/data_join/src/java/
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/JobBase.java
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ResetableIterator.java
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/TaggedMapOutput.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java
    lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=521906&r1=521905&r2=521906
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Mar 23 14:15:53 2007
@@ -18,6 +18,9 @@
  5. HADOOP-1116.  Increase heap size used for contrib unit tests.
     (Philippe Gassmann via cutting)
 
+ 6. HADOOP-1120.  Add contrib/data_join, tools to simplify joining
+    data from multiple sources using MapReduce.  (Runping Qi via cutting)
+
 
 Release 0.12.2 - 2007-23-17
 

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=521906&r1=521905&r2=521906
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Fri Mar 23 14:15:53 2007
@@ -535,6 +535,7 @@
 
     	<packageset dir="src/contrib/streaming/src/java"/>
     	<packageset dir="src/contrib/abacus/src/java"/>
+    	<packageset dir="src/contrib/data_join/src/java"/>
 
         <link href="${javadoc.link.java}"/>
         <classpath refid="classpath"/>
@@ -544,6 +545,7 @@
 
        <group title="contrib: Streaming" packages="org.apache.hadoop.streaming*"/>
        <group title="contrib: Abacus" packages="org.apache.hadoop.abacus*"/>
+       <group title="contrib: DataJoin" packages="org.apache.hadoop.contrib/join*"/>
 
     </javadoc>
   </target>	

Modified: lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java?view=diff&rev=521906&r1=521905&r2=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java (original)
+++ lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/JobBase.java Fri Mar 23 14:15:53 2007
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.abacus;
 
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.Map.Entry;
 import java.util.Iterator;
@@ -37,9 +38,9 @@
 
   public static final Log LOG = LogFactory.getLog("abacus.job");
 
-  private TreeMap<Object, Long> longCounters = null;
+  private SortedMap<Object, Long> longCounters = null;
 
-  private TreeMap<Object, Double> doubleCounters = null;
+  private SortedMap<Object, Double> doubleCounters = null;
 
   /**
    * Set the given counter to the given value
@@ -154,7 +155,7 @@
    *          the configuration
    */
   public void configure(JobConf job) {
-    this.longCounters = new TreeMap();
-    this.doubleCounters = new TreeMap();
+    this.longCounters = new TreeMap<Object,Long> ();
+    this.doubleCounters = new TreeMap<Object,Double> ();
   }
 }

Modified: lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java?view=diff&rev=521906&r1=521905&r2=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/abacus/src/java/org/apache/hadoop/abacus/ValueAggregatorJob.java Fri Mar 23 14:15:53 2007
@@ -101,7 +101,7 @@
       throws IOException {
 
     if (args.length < 2) {
-      System.out.println("usage: inputDirs outDir [numOfReducer [textinputformat|seq [specfile]]]");
+      System.out.println("usage: inputDirs outDir [numOfReducer [textinputformat|seq [specfile [jobName]]]]");
       System.exit(1);
     }
     String inputDir = args[0];
@@ -122,12 +122,18 @@
       specFile = new Path(args[4]);
     }
 
+    String jobName = "";
+    
+    if (args.length > 5) {
+      jobName = args[5];
+    }
+    
     JobConf theJob = new JobConf(ValueAggregatorJob.class);
     if (specFile != null) {
       theJob.addDefaultResource(specFile);
     }
     FileSystem fs = FileSystem.get(theJob);
-    theJob.setJobName("ValueAggregatorJob");
+    theJob.setJobName("ValueAggregatorJob: " + jobName);
 
     String[] inputDirsSpecs = inputDir.split(",");
     for (int i = 0; i < inputDirsSpecs.length; i++) {

Added: lucene/hadoop/trunk/src/contrib/data_join/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/build.xml?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/build.xml (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/build.xml Fri Mar 23 14:15:53 2007
@@ -0,0 +1,23 @@
+<?xml version="1.0"?>
+
+<!-- 
+Before you can run these subtargets directly, you need 
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="datajoin" default="jar">
+
+  <import file="../build-contrib.xml"/>
+
+  <!-- Override jar target to specify main class -->
+  <target name="jar" depends="compile">
+    <jar
+      jarfile="${build.dir}/hadoop-${name}.jar"
+      basedir="${build.classes}"      
+    >
+  	<manifest>
+	    <attribute name="Main-Class" value="org.apache.hadoop.contrib.utils.join.DataJoinJob"/>
+	</manifest>
+    </jar>
+  </target>
+
+</project>

Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * This class provides an implementation of ResetableIterator. The
+ * implementation will be based on ArrayList.
+ * 
+ * @author runping
+ * 
+ */
+public class ArrayListBackedIterator implements ResetableIterator {
+
+  private Iterator iter;
+
+  private ArrayList data;
+
+  public ArrayListBackedIterator() {
+    this(new ArrayList());
+  }
+
+  public ArrayListBackedIterator(ArrayList data) {
+    this.data = data;
+    this.iter = this.data.iterator();
+  }
+
+  public void add(Object item) {
+    this.data.add(item);
+  }
+
+  public boolean hasNext() {
+    return this.iter.hasNext();
+  }
+
+  public Object next() {
+    return this.iter.next();
+  }
+
+  public void remove() {
+
+  }
+
+  public void reset() {
+    this.iter = this.data.iterator();
+  }
+
+  public void close() throws IOException {
+    this.iter = null;
+    this.data = null;
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+/**
+ * This class implements the main function for creating a map/reduce
+ * job to join data of different sources. To create sucn a job, the 
+ * user must implement a mapper class that extends DataJoinMapperBase class,
+ * and a reducer class that extends DataJoinReducerBase. 
+ * 
+ * @author runping
+ *
+ */
+public class DataJoinJob {
+
+  public static Class getClassByName(String className) {
+    Class retv = null;
+    try {
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      retv = Class.forName(className, true, classLoader);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return retv;
+  }
+
+  public static JobConf createDataJoinJob(String args[]) throws IOException {
+
+    String inputDir = args[0];
+    String outputDir = args[1];
+    int numOfReducers = Integer.parseInt(args[2]);
+    Class mapper = getClassByName(args[3]);
+    Class reducer = getClassByName(args[4]);
+    Class mapoutputValueClass = getClassByName(args[5]);
+    Class outputFormat = TextOutputFormat.class;
+    Class outputValueClass = Text.class;
+    if (args[6].compareToIgnoreCase("text") != 0) {
+      System.out.println("Using SequenceFileOutputFormat: " + args[6]);
+      outputFormat = SequenceFileOutputFormat.class;
+      outputValueClass = getClassByName(args[6]);
+    } else {
+      System.out.println("Using TextOutputFormat: " + args[6]);
+    }
+    long maxNumOfValuesPerGroup = 100;
+    String jobName = "";
+    if (args.length > 7) {
+      maxNumOfValuesPerGroup = Long.parseLong(args[7]);
+    }
+    if (args.length > 8) {
+      jobName = args[8];
+    }
+    Configuration defaults = new Configuration();
+    JobConf job = new JobConf(defaults, DataJoinJob.class);
+    job.setJobName("DataJoinJob: " + jobName);
+
+    FileSystem fs = FileSystem.get(defaults);
+    fs.delete(new Path(outputDir));
+    String[] inputDirsSpecs = inputDir.split(",");
+    for (int i = 0; i < inputDirsSpecs.length; i++) {
+      String spec = inputDirsSpecs[i];
+      job.addInputPath(new Path(spec));
+    }
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(mapper);
+    job.setOutputPath(new Path(outputDir));
+    job.setOutputFormat(outputFormat);
+    SequenceFile.setCompressionType(job,
+        SequenceFile.CompressionType.BLOCK);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(mapoutputValueClass);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(outputValueClass);
+    job.setReducerClass(reducer);
+
+    job.setNumMapTasks(1);
+    job.setNumReduceTasks(numOfReducers);
+    job.setLong("ultjoin.maxNumOfValuesPerGroup",
+        maxNumOfValuesPerGroup);
+    job.set("mapred.child.java.opts", "-Xmx1024m");
+    job.setKeepFailedTaskFiles(true);
+    return job;
+  }
+
+  /**
+   * Submit/run a map/reduce job.
+   * 
+   * @param job
+   * @return true for success
+   * @throws IOException
+   */
+  public static boolean runJob(JobConf job) throws IOException {
+    JobClient jc = new JobClient(job);
+    boolean sucess = true;
+    RunningJob running = null;
+    try {
+      running = jc.submitJob(job);
+      String jobId = running.getJobID();
+      System.out.println("Job " + jobId + " is submitted");
+      while (!running.isComplete()) {
+        System.out.println("Job " + jobId + " is still running.");
+        try {
+          Thread.sleep(60000);
+        } catch (InterruptedException e) {
+        }
+        running = jc.getJob(jobId);
+      }
+      sucess = running.isSuccessful();
+    } finally {
+      if (!sucess && (running != null)) {
+        running.killJob();
+      }
+      jc.close();
+    }
+    return sucess;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) {
+    boolean success;
+    if (args.length < 7 || args.length > 9) {
+      System.out.println("usage: DataJoinJob " + "inputdirs outputdir "
+          + "numofParts " + "mapper_class " + "reducer_class "
+          + "map_output_value_class "
+          + "output_value_class [maxNumOfValuesPerGroup [descriptionOfJob]]]");
+      System.exit(-1);
+    }
+
+    try {
+      JobConf job = DataJoinJob.createDataJoinJob(args);
+      success = DataJoinJob.runJob(job);
+      if (!success) {
+        System.out.println("Job failed");
+      }
+    } catch (IOException ioe) {
+      ioe.printStackTrace();
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This abstract class serves as the base class for the mapper class of a data
+ * join job. This class expects its subclasses to implement methods for the
+ * following functionalities:
+ * 
+ * 1. Compute the source tag of input values 2. Compute the map output value
+ * object 3. Compute the map output key object
+ * 
+ * The source tag will be used by the reducer to determine from which source
+ * (which table in SQL terminology) a value comes. Computing the map output
+ * value object amounts to performing projecting/filtering work in a SQL
+ * statement (through the select/where clauses). Computing the map output key
+ * amounts to choosing the join key. This class provides the appropriate plugin
+ * points for the user defined subclasses to implement the appropriate logic.
+ * 
+ */
+public abstract class DataJoinMapperBase extends JobBase {
+
+  protected String inputFile = null;
+
+  protected JobConf job = null;
+
+  protected Text inputTag = null;
+
+  protected Reporter reporter = null;
+
+  public void configure(JobConf job) {
+    super.configure(job);
+    this.job = job;
+    this.inputFile = job.get("map.input.file");
+    this.inputTag = generateInputTag(this.inputFile);
+  }
+
+  /**
+   * Determine the source tag based on the input file name.
+   * 
+   * @param inputFile
+   * @return the source tag computed from the given file name.
+   */
+  protected abstract Text generateInputTag(String inputFile);
+
+  /**
+   * Generate a tagged map output value. The user code can also perform
+   * projection/filtering. If it decides to discard the input record when
+   * certain conditions are met,it can simply return a null.
+   * 
+   * @param value
+   * @return an object of TaggedMapOutput computed from the given value.
+   */
+  protected abstract TaggedMapOutput generateTaggedMapOutput(Writable value);
+
+  /**
+   * Generate a map output key. The user code can compute the key
+   * programmatically, not just selecting the values of some fields. In this
+   * sense, it is more general than the joining capabilities of SQL.
+   * 
+   * @param aRecord
+   * @return the group key for the given record
+   */
+  protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
+
+  public void map(WritableComparable key, Writable value,
+      OutputCollector output, Reporter reporter) throws IOException {
+    if (this.reporter == null) {
+      this.reporter = reporter;
+    }
+    addLongValue("totalCount", 1);
+    TaggedMapOutput aRecord = generateTaggedMapOutput(value);
+    if (aRecord == null) {
+      addLongValue("discardedCount", 1);
+      return;
+    }
+    Text groupKey = generateGroupKey(aRecord);
+    if (groupKey == null) {
+      addLongValue("nullGroupKeyCount", 1);
+      return;
+    }
+    output.collect(groupKey, aRecord);
+    addLongValue("collectedCount", 1);
+  }
+
+  public void close() throws IOException {
+    if (this.reporter != null) {
+      this.reporter.setStatus(super.getReport());
+    }
+  }
+
+  public void reduce(WritableComparable arg0, Iterator arg1,
+      OutputCollector arg2, Reporter arg3) throws IOException {
+    // TODO Auto-generated method stub
+
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This abstract class serves as the base class for the reducer class of a data
+ * join job. The reduce function will first group the values according to their
+ * input tags, and then compute the cross product of over the groups. For each
+ * tuple in the cross product, it calls the following method, which is expected
+ * to be implemented in a subclass.
+ * 
+ * protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
+ * 
+ * The above method is expected to produce one output value from an array of
+ * records of different sources. The user code can also perform filtering here.
+ * It can return null if it decides to the records do not meet certain
+ * conditions.
+ * 
+ */
+public abstract class DataJoinReducerBase extends JobBase {
+
+  protected Reporter reporter = null;
+
+  private long maxNumOfValuesPerGroup = 100;
+
+  protected long largestNumOfValues = 0;
+
+  protected long numOfValues = 0;
+
+  protected long collected = 0;
+
+  protected JobConf job;
+
+  public void close() throws IOException {
+    if (this.reporter != null) {
+      this.reporter.setStatus(super.getReport());
+    }
+  }
+
+  public void configure(JobConf job) {
+    super.configure(job);
+    this.job = job;
+    this.maxNumOfValuesPerGroup = job.getLong("ultjoin.maxNumOfValuesPerGroup",
+        100);
+  }
+
+  /**
+   * The subclass can provide a different implementation on ResetableIterator.
+   * This is necessary if the number of values in a reduce call is very high.
+   * 
+   * The default provided here uses ArrayListBackedIterator
+   * 
+   * @return an Object of ResetableIterator.
+   */
+  protected ResetableIterator createResetableIterator() {
+    return new ArrayListBackedIterator();
+  }
+
+  /**
+   * This is the function that re-groups values for a key into sub-groups based
+   * on a secondary key (input tag).
+   * 
+   * @param arg1
+   * @return
+   */
+  private SortedMap<Object, ResetableIterator> regroup(Writable key,
+      Iterator arg1, Reporter reporter) throws IOException {
+    this.numOfValues = 0;
+    SortedMap<Object, ResetableIterator> retv = new TreeMap<Object, ResetableIterator>();
+    TaggedMapOutput aRecord = null;
+    while (arg1.hasNext()) {
+      aRecord = (TaggedMapOutput) arg1.next();
+      this.numOfValues += 1;
+      if (this.numOfValues % 100 == 0) {
+        reporter.setStatus("key: " + key.toString() + " numOfValues: "
+            + this.numOfValues);
+      }
+      if (this.numOfValues > this.maxNumOfValuesPerGroup) {
+        continue;
+      }
+      Object tag = aRecord.getTag();
+      ResetableIterator data = retv.get(tag);
+      if (data == null) {
+        data = createResetableIterator();
+        retv.put(tag, data);
+      }
+      data.add(aRecord);
+    }
+    if (this.numOfValues > this.largestNumOfValues) {
+      this.largestNumOfValues = numOfValues;
+      LOG.info("key: " + key.toString() + " this.largestNumOfValues: "
+          + this.largestNumOfValues);
+    }
+    return retv;
+  }
+
+  public void reduce(WritableComparable key, Iterator values,
+      OutputCollector output, Reporter reporter) throws IOException {
+    if (this.reporter == null) {
+      this.reporter = reporter;
+    }
+
+    SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);
+    Object[] tags = groups.keySet().toArray();
+    ResetableIterator[] groupValues = new ResetableIterator[tags.length];
+    for (int i = 0; i < tags.length; i++) {
+      groupValues[i] = groups.get(tags[i]);
+    }
+    joinAndCollect(tags, groupValues, key, output, reporter);
+    addLongValue("groupCount", 1);
+    for (int i = 0; i < tags.length; i++) {
+      groupValues[i].close();
+    }
+  }
+
+  /**
+   * The subclass can overwrite this method to perform additional filtering
+   * and/or other processing logic before a value is collected.
+   * 
+   * @param key
+   * @param aRecord
+   * @param output
+   * @param reporter
+   * @throws IOException
+   */
+  protected void collect(WritableComparable key, TaggedMapOutput aRecord,
+      OutputCollector output, Reporter reporter) throws IOException {
+    this.collected += 1;
+    addLongValue("collectedCount", 1);
+    if (aRecord != null && this.collected % 1 == 0) {
+      output.collect(key, aRecord.getData());
+      reporter.setStatus("key: " + key.toString() + " collected: " + collected);
+      addLongValue("actuallyCollectedCount", 1);
+    }
+  }
+
+  /**
+   * join the list of the value lists, and collect the results.
+   * 
+   * @param tags
+   *          a list of input tags
+   * @param values
+   *          a list of value lists, each corresponding to one input source
+   * @param key
+   * @param output
+   * @throws IOException
+   */
+  private void joinAndCollect(Object[] tags, ResetableIterator[] values,
+      WritableComparable key, OutputCollector output, Reporter reporter)
+      throws IOException {
+    if (values.length < 1) {
+      return;
+    }
+    Object[] partialList = new Object[values.length];
+    joinAndCollect(tags, values, 0, partialList, key, output, reporter);
+  }
+
+  /**
+   * Perform the actual join recursively.
+   * 
+   * @param tags
+   *          a list of input tags
+   * @param values
+   *          a list of value lists, each corresponding to one input source
+   * @param pos
+   *          indicating the next value list to be joined
+   * @param partialList
+   *          a list of values, each from one value list considered so far.
+   * @param key
+   * @param output
+   * @throws IOException
+   */
+  private void joinAndCollect(Object[] tags, ResetableIterator[] values,
+      int pos, Object[] partialList, WritableComparable key,
+      OutputCollector output, Reporter reporter) throws IOException {
+
+    if (values.length == pos) {
+      // get a value from each source. Combine them
+      TaggedMapOutput combined = combine(tags, partialList);
+      collect(key, combined, output, reporter);
+      return;
+    }
+    ResetableIterator nextValues = values[pos];
+    nextValues.reset();
+    while (nextValues.hasNext()) {
+      Object v = nextValues.next();
+      partialList[pos] = v;
+      joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter);
+    }
+  }
+
+  public static Text SOURCE_TAGS_FIELD = new Text("SOURCE_TAGS");
+
+  public static Text NUM_OF_VALUES_FIELD = new Text("NUM_OF_VALUES");
+
+  /**
+   * 
+   * @param tags
+   *          a list of source tags
+   * @param values
+   *          a value per source
+   * @return combined value derived from values of the sources
+   */
+  protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
+
+  public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+      Reporter arg3) throws IOException {
+    // TODO Auto-generated method stub
+
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/JobBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/JobBase.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/JobBase.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/JobBase.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+ * A common base implementing some statics collecting mechanisms that are
+ * commonly used in a typical map/reduce job.
+ * 
+ */
+public abstract class JobBase implements Mapper, Reducer {
+
+  public static final Log LOG = LogFactory.getLog("datajoin.job");
+
+  private SortedMap<Object, Long> longCounters = null;
+
+  private SortedMap<Object, Double> doubleCounters = null;
+
+  /**
+   * Set the given counter to the given value
+   * 
+   * @param name
+   *          the counter name
+   * @param value
+   *          the value for the counter
+   */
+  protected void setLongValue(Object name, long value) {
+    this.longCounters.put(name, new Long(value));
+  }
+
+  /**
+   * Set the given counter to the given value
+   * 
+   * @param name
+   *          the counter name
+   * @param value
+   *          the value for the counter
+   */
+  protected void setDoubleValue(Object name, double value) {
+    this.doubleCounters.put(name, new Double(value));
+  }
+
+  /**
+   * 
+   * @param name
+   *          the counter name
+   * @return return the value of the given counter.
+   */
+  protected Long getLongValue(Object name) {
+    return this.longCounters.get(name);
+  }
+
+  /**
+   * 
+   * @param name
+   *          the counter name
+   * @return return the value of the given counter.
+   */
+  protected Double getDoubleValue(Object name) {
+    return this.doubleCounters.get(name);
+  }
+
+  /**
+   * Increment the given counter by the given incremental value If the counter
+   * does not exist, one is created with value 0.
+   * 
+   * @param name
+   *          the counter name
+   * @param inc
+   *          the incremental value
+   * @return the updated value.
+   */
+  protected Long addLongValue(Object name, long inc) {
+    Long val = this.longCounters.get(name);
+    Long retv = null;
+    if (val == null) {
+      retv = new Long(inc);
+    } else {
+      retv = new Long(val.longValue() + inc);
+    }
+    this.longCounters.put(name, retv);
+    return retv;
+  }
+
+  /**
+   * Increment the given counter by the given incremental value If the counter
+   * does not exist, one is created with value 0.
+   * 
+   * @param name
+   *          the counter name
+   * @param inc
+   *          the incremental value
+   * @return the updated value.
+   */
+  protected Double addDoubleValue(Object name, double inc) {
+    Double val = this.doubleCounters.get(name);
+    Double retv = null;
+    if (val == null) {
+      retv = new Double(inc);
+    } else {
+      retv = new Double(val.doubleValue() + inc);
+    }
+    this.doubleCounters.put(name, retv);
+    return retv;
+  }
+
+  /**
+   * log the counters
+   * 
+   */
+  protected void report() {
+    LOG.info(getReport());
+  }
+
+  /**
+   * log the counters
+   * 
+   */
+  protected String getReport() {
+    StringBuffer sb = new StringBuffer();
+
+    Iterator iter = this.longCounters.entrySet().iterator();
+    while (iter.hasNext()) {
+      Entry e = (Entry) iter.next();
+      sb.append(e.getKey().toString()).append("\t").append(e.getValue())
+          .append("\n");
+    }
+    iter = this.doubleCounters.entrySet().iterator();
+    while (iter.hasNext()) {
+      Entry e = (Entry) iter.next();
+      sb.append(e.getKey().toString()).append("\t").append(e.getValue())
+          .append("\n");
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Initializes a new instance from a {@link JobConf}.
+   * 
+   * @param job
+   *          the configuration
+   */
+  public void configure(JobConf job) {
+    this.longCounters = new TreeMap<Object, Long>();
+    this.doubleCounters = new TreeMap<Object, Double>();
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ResetableIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ResetableIterator.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ResetableIterator.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ResetableIterator.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * This interface defines an iterator interface that will help the reducer class
+ * for re-grouping the values in the values iterator of the reduce method
+ * according the their source tags. Once the value re-grouped, the reducer can
+ * perform the cross product over the values in different groups.
+ * 
+ * @author runping
+ * 
+ */
+public interface ResetableIterator extends Iterator {
+  public void reset();
+
+  public void add(Object item);
+
+  public void close() throws IOException;
+}

Added: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/TaggedMapOutput.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/TaggedMapOutput.java?view=auto&rev=521906
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/TaggedMapOutput.java (added)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/TaggedMapOutput.java Fri Mar 23 14:15:53 2007
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.utils.join;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This abstract class serves as the base class for the values that 
+ * flow from the mappers to the reducers in a data join job. 
+ * Typically, in such a job, the mappers will compute the source
+ * tag of an input record based on its attributes or based on the 
+ * file name of the input file. This tag will be used by the reducers
+ * to re-group the values of a given key according to their source tags.
+ * 
+ * @author runping
+ *
+ */
+public abstract class TaggedMapOutput implements Writable {
+  protected Text tag;
+
+  public TaggedMapOutput() {
+    this.tag = new Text("");
+  }
+
+  public Text getTag() {
+    return tag;
+  }
+
+  public void setTag(Text tag) {
+    this.tag = tag;
+  }
+
+  public abstract Writable getData();
+
+}