You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/06/12 00:42:51 UTC

svn commit: r546310 - in /lucene/hadoop/trunk: ./ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/

Author: cutting
Date: Mon Jun 11 15:42:49 2007
New Revision: 546310

URL: http://svn.apache.org/viewvc?view=rev&rev=546310
Log:
HADOOP-1447.  Add support to contrib/data_join for text inputs.  Contributed by Senthil Subramanian.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java
    lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=546310&r1=546309&r2=546310
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jun 11 15:42:49 2007
@@ -104,6 +104,9 @@
      verified after data is read from large buffers, to better catch
      memory errors.  (cutting)
 
+ 34. HADOOP-1447.  Add support in contrib/data_join for text inputs.
+     (Senthil Subramanian via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java?view=diff&rev=546310&r1=546309&r2=546310
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java (original)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java Mon Jun 11 15:42:49 2007
@@ -33,13 +33,13 @@
 
   private Iterator iter;
 
-  private ArrayList data;
+  private ArrayList<Object> data;
 
   public ArrayListBackedIterator() {
-    this(new ArrayList());
+    this(new ArrayList<Object>());
   }
 
-  public ArrayListBackedIterator(ArrayList data) {
+  public ArrayListBackedIterator(ArrayList<Object> data) {
     this.data = data;
     this.iter = this.data.iterator();
   }

Modified: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java?view=diff&rev=546310&r1=546309&r2=546310
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java Mon Jun 11 15:42:49 2007
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 
 /**
@@ -58,26 +59,33 @@
 
     String inputDir = args[0];
     String outputDir = args[1];
-    int numOfReducers = Integer.parseInt(args[2]);
-    Class mapper = getClassByName(args[3]);
-    Class reducer = getClassByName(args[4]);
-    Class mapoutputValueClass = getClassByName(args[5]);
+    Class inputFormat = SequenceFileInputFormat.class;
+    if (args[2].compareToIgnoreCase("text") != 0) {
+      System.out.println("Using SequenceFileInputFormat: " + args[2]);
+    } else {
+      System.out.println("Using TextInputFormat: " + args[2]);
+      inputFormat = TextInputFormat.class;
+    }
+    int numOfReducers = Integer.parseInt(args[3]);
+    Class mapper = getClassByName(args[4]);
+    Class reducer = getClassByName(args[5]);
+    Class mapoutputValueClass = getClassByName(args[6]);
     Class outputFormat = TextOutputFormat.class;
     Class outputValueClass = Text.class;
-    if (args[6].compareToIgnoreCase("text") != 0) {
-      System.out.println("Using SequenceFileOutputFormat: " + args[6]);
+    if (args[7].compareToIgnoreCase("text") != 0) {
+      System.out.println("Using SequenceFileOutputFormat: " + args[7]);
       outputFormat = SequenceFileOutputFormat.class;
-      outputValueClass = getClassByName(args[6]);
+      outputValueClass = getClassByName(args[7]);
     } else {
-      System.out.println("Using TextOutputFormat: " + args[6]);
+      System.out.println("Using TextOutputFormat: " + args[7]);
     }
     long maxNumOfValuesPerGroup = 100;
     String jobName = "";
-    if (args.length > 7) {
-      maxNumOfValuesPerGroup = Long.parseLong(args[7]);
-    }
     if (args.length > 8) {
-      jobName = args[8];
+      maxNumOfValuesPerGroup = Long.parseLong(args[8]);
+    }
+    if (args.length > 9) {
+      jobName = args[9];
     }
     Configuration defaults = new Configuration();
     JobConf job = new JobConf(defaults, DataJoinJob.class);
@@ -91,7 +99,7 @@
       job.addInputPath(new Path(spec));
     }
 
-    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputFormat(inputFormat);
 
     job.setMapperClass(mapper);
     job.setOutputPath(new Path(outputDir));
@@ -106,10 +114,7 @@
 
     job.setNumMapTasks(1);
     job.setNumReduceTasks(numOfReducers);
-    job.setLong("ultjoin.maxNumOfValuesPerGroup",
-                maxNumOfValuesPerGroup);
-    job.set("mapred.child.java.opts", "-Xmx1024m");
-    job.setKeepFailedTaskFiles(true);
+    job.setLong("datajoin.maxNumOfValuesPerGroup", maxNumOfValuesPerGroup);
     return job;
   }
 
@@ -151,8 +156,8 @@
    */
   public static void main(String[] args) {
     boolean success;
-    if (args.length < 7 || args.length > 9) {
-      System.out.println("usage: DataJoinJob " + "inputdirs outputdir "
+    if (args.length < 8 || args.length > 10) {
+      System.out.println("usage: DataJoinJob " + "inputdirs outputdir map_input_file_format "
                          + "numofParts " + "mapper_class " + "reducer_class "
                          + "map_output_value_class "
                          + "output_value_class [maxNumOfValuesPerGroup [descriptionOfJob]]]");

Modified: lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java?view=diff&rev=546310&r1=546309&r2=546310
==============================================================================
--- lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java (original)
+++ lucene/hadoop/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java Mon Jun 11 15:42:49 2007
@@ -68,8 +68,7 @@
   public void configure(JobConf job) {
     super.configure(job);
     this.job = job;
-    this.maxNumOfValuesPerGroup = job.getLong("ultjoin.maxNumOfValuesPerGroup",
-                                              100);
+    this.maxNumOfValuesPerGroup = job.getLong("datajoin.maxNumOfValuesPerGroup", 100);
   }
 
   /**
@@ -155,7 +154,7 @@
                          OutputCollector output, Reporter reporter) throws IOException {
     this.collected += 1;
     addLongValue("collectedCount", 1);
-    if (aRecord != null && this.collected % 1 == 0) {
+    if (aRecord != null) {
       output.collect(key, aRecord.getData());
       reporter.setStatus("key: " + key.toString() + " collected: " + collected);
       addLongValue("actuallyCollectedCount", 1);