You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/08/27 23:54:01 UTC
svn commit: r689635 - in /hadoop/core/trunk: ./ src/examples/pipes/impl/
src/mapred/org/apache/hadoop/mapred/pipes/
src/test/org/apache/hadoop/mapred/pipes/
Author: omalley
Date: Wed Aug 27 14:54:00 2008
New Revision: 689635
URL: http://svn.apache.org/viewvc?rev=689635&view=rev
Log:
HADOOP-2168. Fix problem with C++ record reader's progress not being
reported to framework. (acmurthy via omalley)
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Aug 27 14:54:00 2008
@@ -396,6 +396,9 @@
input jobs. Delays limiting task placement until after 10% of the maps
have finished. (Ari Rabkin via omalley)
+ HADOOP-2168. Fix problem with C++ record reader's progress not being
+ reported to framework. (acmurthy via omalley)
+
Release 0.18.1 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc (original)
+++ hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc Wed Aug 27 14:54:00 2008
@@ -105,7 +105,7 @@
*/
virtual float getProgress() {
if (bytesTotal > 0) {
- return bytesRead / bytesTotal;
+ return (float)bytesRead / bytesTotal;
} else {
return 1.0f;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java Wed Aug 27 14:54:00 2008
@@ -31,10 +31,13 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskLog;
@@ -59,6 +62,7 @@
/**
* Start the child process to handle the task for us.
* @param conf the task's configuration
+ * @param recordReader the fake record reader to update progress with
* @param output the collector to send output to
* @param reporter the reporter for the task
* @param outputKeyClass the class of the output keys
@@ -66,7 +70,9 @@
* @throws IOException
* @throws InterruptedException
*/
- Application(JobConf conf, OutputCollector<K2, V2> output, Reporter reporter,
+ Application(JobConf conf,
+ RecordReader<FloatWritable, NullWritable> recordReader,
+ OutputCollector<K2,V2> output, Reporter reporter,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass
) throws IOException, InterruptedException {
@@ -89,7 +95,7 @@
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
- handler = new OutputHandler<K2, V2>(output, reporter);
+ handler = new OutputHandler<K2, V2>(output, reporter, recordReader);
K2 outputKey = (K2)
ReflectionUtils.newInstance(outputKeyClass, conf);
V2 outputValue = (V2)
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java Wed Aug 27 14:54:00 2008
@@ -24,10 +24,13 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
/**
@@ -42,6 +45,7 @@
private float progressValue = 0.0f;
private boolean done = false;
private Throwable exception = null;
+ RecordReader<FloatWritable,NullWritable> recordReader = null;
private Map<Integer, Counters.Counter> registeredCounters =
new HashMap<Integer, Counters.Counter>();
@@ -50,9 +54,11 @@
* @param collector the "real" collector that takes the output
* @param reporter the reporter for reporting progress
*/
- public OutputHandler(OutputCollector<K, V> collector, Reporter reporter) {
+ public OutputHandler(OutputCollector<K, V> collector, Reporter reporter,
+ RecordReader<FloatWritable,NullWritable> recordReader) {
this.reporter = reporter;
this.collector = collector;
+ this.recordReader = recordReader;
}
/**
@@ -78,12 +84,19 @@
reporter.setStatus(msg);
}
+ private FloatWritable progressKey = new FloatWritable(0.0f);
+ private NullWritable nullValue = NullWritable.get();
/**
* Update the amount done and call progress on the reporter.
*/
public void progress(float progress) throws IOException {
progressValue = progress;
reporter.progress();
+
+ if (recordReader != null) {
+ progressKey.set(progress);
+ recordReader.next(progressKey, nullValue);
+ }
}
/**
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Wed Aug 27 14:54:00 2008
@@ -20,6 +20,8 @@
import java.io.IOException;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
@@ -59,7 +61,12 @@
Reporter reporter) throws IOException {
Application<K1, V1, K2, V2> application = null;
try {
- application = new Application<K1, V1, K2, V2>(job, output, reporter,
+ RecordReader<FloatWritable, NullWritable> fakeInput =
+ (!Submitter.getIsJavaRecordReader(job) &&
+ !Submitter.getIsJavaMapper(job)) ?
+ (RecordReader<FloatWritable, NullWritable>) input : null;
+ application = new Application<K1, V1, K2, V2>(job, fakeInput, output,
+ reporter,
(Class<? extends K2>) job.getOutputKeyClass(),
(Class<? extends V2>) job.getOutputValueClass());
} catch (InterruptedException ie) {
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java?rev=689635&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java Wed Aug 27 14:54:00 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Dummy input format used when non-Java a {@link RecordReader} is used by
+ * the Pipes' application.
+ *
+ * The only useful thing this does is set up the Map-Reduce job to get the
+ * {@link PipesDummyRecordReader}, everything else left for the 'actual'
+ * InputFormat specified by the user which is given by
+ * <i>mapred.pipes.user.inputformat</i>.
+ */
+class PipesNonJavaInputFormat
+implements InputFormat<FloatWritable, NullWritable> {
+
+ public RecordReader<FloatWritable, NullWritable> getRecordReader(
+ InputSplit genericSplit, JobConf job, Reporter reporter)
+ throws IOException {
+ return new PipesDummyRecordReader(job, genericSplit);
+ }
+
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ // Delegate the generation of input splits to the 'original' InputFormat
+ return ReflectionUtils.newInstance(
+ job.getClass("mapred.pipes.user.inputformat",
+ TextInputFormat.class,
+ InputFormat.class), job).getSplits(job, numSplits);
+ }
+
+ /**
+ * A dummy {@link org.apache.hadoop.mapred.RecordReader} to help track the
+ * progress of Hadoop Pipes' applications when they are using a non-Java
+ * <code>RecordReader</code>.
+ *
+ * The <code>PipesDummyRecordReader</code> is informed of the 'progress' of
+ * the task by the {@link OutputHandler#progress(float)} which calls the
+ * {@link #next(FloatWritable, NullWritable)} with the progress as the
+ * <code>key</code>.
+ */
+ class PipesDummyRecordReader implements RecordReader<FloatWritable, NullWritable> {
+ float progress = 0.0f;
+
+ public PipesDummyRecordReader(Configuration job, InputSplit split)
+ throws IOException{
+ }
+
+
+ public FloatWritable createKey() {
+ return null;
+ }
+
+ public NullWritable createValue() {
+ return null;
+ }
+
+ public synchronized void close() throws IOException {}
+
+ public synchronized long getPos() throws IOException {
+ return 0;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public synchronized boolean next(FloatWritable key, NullWritable value)
+ throws IOException {
+ progress = key.get();
+ return true;
+ }
+ }
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java Wed Aug 27 14:54:00 2008
@@ -78,9 +78,11 @@
if (application == null) {
try {
LOG.info("starting application");
- application = new Application<K2, V2, K3, V3>(job, output, reporter,
- (Class<? extends K3>) job.getOutputKeyClass(),
- (Class<? extends V3>) job.getOutputValueClass());
+ application =
+ new Application<K2, V2, K3, V3>(
+ job, null, output, reporter,
+ (Class<? extends K3>) job.getOutputKeyClass(),
+ (Class<? extends V3>) job.getOutputValueClass());
downlink = application.getDownlink();
} catch (InterruptedException ie) {
throw new RuntimeException("interrupted", ie);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java Wed Aug 27 14:54:00 2008
@@ -261,6 +261,15 @@
setIfUnset(conf, "mapred.mapoutput.value.class", textClassname);
setIfUnset(conf, "mapred.output.key.class", textClassname);
setIfUnset(conf, "mapred.output.value.class", textClassname);
+
+ // Use PipesNonJavaInputFormat if necessary to handle progress reporting
+ // from C++ RecordReaders ...
+ if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
+ conf.setClass("mapred.pipes.user.inputformat",
+ conf.getInputFormat().getClass(), InputFormat.class);
+ conf.setInputFormat(PipesNonJavaInputFormat.class);
+ }
+
String exec = getExecutable(conf);
if (exec == null) {
throw new IllegalArgumentException("No application program defined.");
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=689635&r1=689634&r2=689635&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Wed Aug 27 14:54:00 2008
@@ -217,15 +217,26 @@
DataOutputStream out = local.create(new Path(inDir, "part0"));
out.writeBytes("i am a silly test\n");
out.writeBytes("you are silly\n");
+ out.writeBytes("i am a cat test\n");
+ out.writeBytes("you is silly\n");
+ out.writeBytes("i am a billy test\n");
+ out.writeBytes("hello are silly\n");
out.close();
out = local.create(new Path(inDir, "part1"));
- out.writeBytes("all silly things drink java\n");
+ out.writeBytes("mall world things drink java\n");
+ out.writeBytes("hall silly cats drink java\n");
+ out.writeBytes("all dogs bow wow\n");
+ out.writeBytes("hello drink java\n");
out.close();
local.delete(outDir, true);
local.mkdirs(outDir);
out = local.create(jobXml);
job.write(out);
out.close();
+ System.err.println("About to run: Submitter -conf " + jobXml +
+ " -input " + inDir + " -output " + outDir +
+ " -program " +
+ dfs.getFileSystem().makeQualified(wordExec));
try {
Submitter.main(new String[]{"-conf", jobXml.toString(),
"-input", inDir.toString(),