You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/08/27 23:54:01 UTC

svn commit: r689635 - in /hadoop/core/trunk: ./ src/examples/pipes/impl/ src/mapred/org/apache/hadoop/mapred/pipes/ src/test/org/apache/hadoop/mapred/pipes/

Author: omalley
Date: Wed Aug 27 14:54:00 2008
New Revision: 689635

URL: http://svn.apache.org/viewvc?rev=689635&view=rev
Log:
HADOOP-2168. Fix problem with C++ record reader's progress not being
reported to framework. (acmurthy via omalley)

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Aug 27 14:54:00 2008
@@ -396,6 +396,9 @@
     input jobs. Delays limiting task placement until after 10% of the maps
     have finished. (Ari Rabkin via omalley)
 
+    HADOOP-2168. Fix problem with C++ record reader's progress not being
+    reported to framework. (acmurthy via omalley)
+
 Release 0.18.1 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc (original)
+++ hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc Wed Aug 27 14:54:00 2008
@@ -105,7 +105,7 @@
    */
   virtual float getProgress() {
     if (bytesTotal > 0) {
-      return bytesRead / bytesTotal;
+      return (float)bytesRead / bytesTotal;
     } else {
       return 1.0f;
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java Wed Aug 27 14:54:00 2008
@@ -31,10 +31,13 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskLog;
@@ -59,6 +62,7 @@
   /**
    * Start the child process to handle the task for us.
    * @param conf the task's configuration
+   * @param recordReader the fake record reader to update progress with
    * @param output the collector to send output to
    * @param reporter the reporter for the task
    * @param outputKeyClass the class of the output keys
@@ -66,7 +70,9 @@
    * @throws IOException
    * @throws InterruptedException
    */
-  Application(JobConf conf, OutputCollector<K2, V2> output, Reporter reporter,
+  Application(JobConf conf, 
+              RecordReader<FloatWritable, NullWritable> recordReader, 
+              OutputCollector<K2,V2> output, Reporter reporter,
               Class<? extends K2> outputKeyClass,
               Class<? extends V2> outputValueClass
               ) throws IOException, InterruptedException {
@@ -89,7 +95,7 @@
 
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();
-    handler = new OutputHandler<K2, V2>(output, reporter);
+    handler = new OutputHandler<K2, V2>(output, reporter, recordReader);
     K2 outputKey = (K2)
       ReflectionUtils.newInstance(outputKeyClass, conf);
     V2 outputValue = (V2) 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java Wed Aug 27 14:54:00 2008
@@ -24,10 +24,13 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
@@ -42,6 +45,7 @@
   private float progressValue = 0.0f;
   private boolean done = false;
   private Throwable exception = null;
+  RecordReader<FloatWritable,NullWritable> recordReader = null;
   private Map<Integer, Counters.Counter> registeredCounters = 
     new HashMap<Integer, Counters.Counter>();
 
@@ -50,9 +54,11 @@
    * @param collector the "real" collector that takes the output
    * @param reporter the reporter for reporting progress
    */
-  public OutputHandler(OutputCollector<K, V> collector, Reporter reporter) {
+  public OutputHandler(OutputCollector<K, V> collector, Reporter reporter, 
+                       RecordReader<FloatWritable,NullWritable> recordReader) {
     this.reporter = reporter;
     this.collector = collector;
+    this.recordReader = recordReader;
   }
 
   /**
@@ -78,12 +84,19 @@
     reporter.setStatus(msg);
   }
 
+  private FloatWritable progressKey = new FloatWritable(0.0f);
+  private NullWritable nullValue = NullWritable.get();
   /**
    * Update the amount done and call progress on the reporter.
    */
   public void progress(float progress) throws IOException {
     progressValue = progress;
     reporter.progress();
+    
+    if (recordReader != null) {
+      progressKey.set(progress);
+      recordReader.next(progressKey, nullValue);
+    }
   }
 
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Wed Aug 27 14:54:00 2008
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
@@ -59,7 +61,12 @@
                   Reporter reporter) throws IOException {
     Application<K1, V1, K2, V2> application = null;
     try {
-      application = new Application<K1, V1, K2, V2>(job, output, reporter,
+      RecordReader<FloatWritable, NullWritable> fakeInput = 
+        (!Submitter.getIsJavaRecordReader(job) && 
+         !Submitter.getIsJavaMapper(job)) ? 
+	  (RecordReader<FloatWritable, NullWritable>) input : null;
+      application = new Application<K1, V1, K2, V2>(job, fakeInput, output, 
+                                                    reporter,
           (Class<? extends K2>) job.getOutputKeyClass(), 
           (Class<? extends V2>) job.getOutputValueClass());
     } catch (InterruptedException ie) {

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java?rev=689635&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java Wed Aug 27 14:54:00 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Dummy input format used when non-Java a {@link RecordReader} is used by
+ * the Pipes' application.
+ *
+ * The only useful thing this does is set up the Map-Reduce job to get the
+ * {@link PipesDummyRecordReader}, everything else left for the 'actual'
+ * InputFormat specified by the user which is given by 
+ * <i>mapred.pipes.user.inputformat</i>.
+ */
+class PipesNonJavaInputFormat 
+implements InputFormat<FloatWritable, NullWritable> {
+
+  public RecordReader<FloatWritable, NullWritable> getRecordReader(
+      InputSplit genericSplit, JobConf job, Reporter reporter)
+      throws IOException {
+    return new PipesDummyRecordReader(job, genericSplit);
+  }
+  
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    // Delegate the generation of input splits to the 'original' InputFormat
+    return ReflectionUtils.newInstance(
+        job.getClass("mapred.pipes.user.inputformat", 
+                     TextInputFormat.class, 
+                     InputFormat.class), job).getSplits(job, numSplits);
+  }
+
+  /**
+   * A dummy {@link org.apache.hadoop.mapred.RecordReader} to help track the
+   * progress of Hadoop Pipes' applications when they are using a non-Java
+   * <code>RecordReader</code>.
+   *
+   * The <code>PipesDummyRecordReader</code> is informed of the 'progress' of
+   * the task by the {@link OutputHandler#progress(float)} which calls the
+   * {@link #next(FloatWritable, NullWritable)} with the progress as the
+   * <code>key</code>.
+   */
+  class PipesDummyRecordReader implements RecordReader<FloatWritable, NullWritable> {
+    float progress = 0.0f;
+    
+    public PipesDummyRecordReader(Configuration job, InputSplit split)
+    throws IOException{
+    }
+
+    
+    public FloatWritable createKey() {
+      return null;
+    }
+
+    public NullWritable createValue() {
+      return null;
+    }
+
+    public synchronized void close() throws IOException {}
+
+    public synchronized long getPos() throws IOException {
+      return 0;
+    }
+
+    public float getProgress() {
+      return progress;
+    }
+
+    public synchronized boolean next(FloatWritable key, NullWritable value)
+        throws IOException {
+      progress = key.get();
+      return true;
+    }
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java Wed Aug 27 14:54:00 2008
@@ -78,9 +78,11 @@
     if (application == null) {
       try {
         LOG.info("starting application");
-        application = new Application<K2, V2, K3, V3>(job, output, reporter, 
-                                      (Class<? extends K3>) job.getOutputKeyClass(), 
-                                      (Class<? extends V3>) job.getOutputValueClass());
+        application = 
+          new Application<K2, V2, K3, V3>(
+              job, null, output, reporter, 
+              (Class<? extends K3>) job.getOutputKeyClass(), 
+              (Class<? extends V3>) job.getOutputValueClass());
         downlink = application.getDownlink();
       } catch (InterruptedException ie) {
         throw new RuntimeException("interrupted", ie);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java Wed Aug 27 14:54:00 2008
@@ -261,6 +261,15 @@
     setIfUnset(conf, "mapred.mapoutput.value.class", textClassname);
     setIfUnset(conf, "mapred.output.key.class", textClassname);
     setIfUnset(conf, "mapred.output.value.class", textClassname);
+    
+    // Use PipesNonJavaInputFormat if necessary to handle progress reporting
+    // from C++ RecordReaders ...
+    if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
+      conf.setClass("mapred.pipes.user.inputformat", 
+                    conf.getInputFormat().getClass(), InputFormat.class);
+      conf.setInputFormat(PipesNonJavaInputFormat.class);
+    }
+    
     String exec = getExecutable(conf);
     if (exec == null) {
       throw new IllegalArgumentException("No application program defined.");

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Wed Aug 27 14:54:00 2008
@@ -217,15 +217,26 @@
     DataOutputStream out = local.create(new Path(inDir, "part0"));
     out.writeBytes("i am a silly test\n");
     out.writeBytes("you are silly\n");
+    out.writeBytes("i am a cat test\n");
+    out.writeBytes("you is silly\n");
+    out.writeBytes("i am a billy test\n");
+    out.writeBytes("hello are silly\n");
     out.close();
     out = local.create(new Path(inDir, "part1"));
-    out.writeBytes("all silly things drink java\n");
+    out.writeBytes("mall world things drink java\n");
+    out.writeBytes("hall silly cats drink java\n");
+    out.writeBytes("all dogs bow wow\n");
+    out.writeBytes("hello drink java\n");
     out.close();
     local.delete(outDir, true);
     local.mkdirs(outDir);
     out = local.create(jobXml);
     job.write(out);
     out.close();
+    System.err.println("About to run: Submitter -conf " + jobXml + 
+                       " -input " + inDir + " -output " + outDir + 
+                       " -program " + 
+                       dfs.getFileSystem().makeQualified(wordExec));
     try {
       Submitter.main(new String[]{"-conf", jobXml.toString(),
                                   "-input", inDir.toString(),