You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/08/11 14:03:47 UTC
svn commit: r684731 - in /hadoop/core/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/mapred/org/apache/hadoop/mapred/
src/mapred/org/apache/hadoop/mapred/lib/ src/ma...
Author: ddas
Date: Mon Aug 11 05:03:37 2008
New Revision: 684731
URL: http://svn.apache.org/viewvc?rev=684731&view=rev
Log:
HADOOP-153. Provides a way to skip bad records. Contributed by Sharad Agarwal.
Added:
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSortedRanges.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Aug 11 05:03:37 2008
@@ -70,6 +70,8 @@
HADOOP-2302. Provides a comparator for numerical sorting of key fields.
(ddas)
+ HADOOP-153. Provides a way to skip bad records. (Sharad Agarwal via ddas)
+
IMPROVEMENTS
HADOOP-3732. Delay intialization of datanode block verification till
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Mon Aug 11 05:03:37 2008
@@ -25,6 +25,7 @@
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.StringUtils;
@@ -34,6 +35,7 @@
public class PipeMapper extends PipeMapRed implements Mapper {
private boolean ignoreKey = false;
+ private boolean skipping = false;
private byte[] mapOutputFieldSeparator;
private byte[] mapInputFieldSeparator;
@@ -59,6 +61,11 @@
public void configure(JobConf job) {
super.configure(job);
+ //disable the auto increment of the counter. For streaming, no of
+ //processed records could be different(equal or less) than the no of
+ //records input.
+ SkipBadRecords.setAutoIncrMapperProcCount(job, false);
+ skipping = job.getBoolean("mapred.skip.on", false);
String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
@@ -101,6 +108,11 @@
}
write(value);
clientOut_.write('\n');
+ if(skipping) {
+ //flush the streams on every record input if running in skip mode
+ //so that we don't buffer other records surrounding a bad record.
+ clientOut_.flush();
+ }
} else {
numRecSkipped_++;
}
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Mon Aug 11 05:03:37 2008
@@ -27,6 +27,7 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.io.Writable;
@@ -39,6 +40,7 @@
private byte[] reduceOutFieldSeparator;
private byte[] reduceInputFieldSeparator;
private int numOfReduceOutputKeyFields = 1;
+ private boolean skipping = false;
String getPipeCommand(JobConf job) {
String str = job.get("stream.reduce.streamprocessor");
@@ -61,6 +63,11 @@
public void configure(JobConf job) {
super.configure(job);
+ //disable the auto increment of the counter. For streaming, no of
+ //processed records could be different(equal or less) than the no of
+ //records input.
+ SkipBadRecords.setAutoIncrReducerProcCount(job, false);
+ skipping = job.getBoolean("mapred.skip.on", false);
try {
reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
@@ -99,6 +106,11 @@
output.collect(key, val);
}
}
+ if(doPipe_ && skipping) {
+ //flush the streams on every record input if running in skip mode
+ //so that we don't buffer other records surrounding a bad record.
+ clientOut_.flush();
+ }
} catch (IOException io) {
// a common reason to get here is failure of the subprocess.
// Document that fact, if possible.
Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java?rev=684731&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java Mon Aug 11 05:03:37 2008
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SkipBadRecords;
+
+public class TestStreamingBadRecords extends ClusterMapReduceTestCase
+{
+
+ private static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.mapred.TestStreamingBadRecords");
+
+ private static final List<String> MAPPER_BAD_RECORDS =
+ Arrays.asList("hey022","hey023","hey099");
+
+ private static final List<String> REDUCER_BAD_RECORDS =
+ Arrays.asList("hey001","hey024");
+
+ private static final String badMapper =
+ StreamUtil.makeJavaCommand(BadApp.class, new String[]{});
+ private static final String badReducer =
+ StreamUtil.makeJavaCommand(BadApp.class, new String[]{"true"});
+ private static final int INPUTSIZE=100;
+
+ public TestStreamingBadRecords() throws IOException
+ {
+ UtilTest utilTest = new UtilTest(getClass().getName());
+ utilTest.checkUserDir();
+ utilTest.redirectIfAntJunit();
+ }
+
+ private void createInput() throws Exception {
+ OutputStream os = getFileSystem().create(new Path(getInputDir(),
+ "text.txt"));
+ Writer wr = new OutputStreamWriter(os);
+ //increasing the record size so that we have stream flushing
+ String prefix = new String(new byte[20*1024]);
+ for(int i=1;i<=INPUTSIZE;i++) {
+ String str = ""+i;
+ int zerosToPrepend = 3 - str.length();
+ for(int j=0;j<zerosToPrepend;j++){
+ str = "0"+str;
+ }
+ wr.write(prefix + "hey"+str+"\n");
+ }wr.close();
+ }
+
+ private void validateOutput(RunningJob runningJob, boolean validateCount)
+ throws Exception {
+ LOG.info(runningJob.getCounters().toString());
+ assertTrue(runningJob.isSuccessful());
+ List<String> badRecs = new ArrayList<String>();
+ badRecs.addAll(MAPPER_BAD_RECORDS);
+ badRecs.addAll(REDUCER_BAD_RECORDS);
+ Path[] outputFiles = FileUtil.stat2Paths(
+ getFileSystem().listStatus(getOutputDir(),
+ new OutputLogFilter()));
+
+ if (outputFiles.length > 0) {
+ InputStream is = getFileSystem().open(outputFiles[0]);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ String line = reader.readLine();
+ int counter = 0;
+ while (line != null) {
+ counter++;
+ StringTokenizer tokeniz = new StringTokenizer(line, "\t");
+ String value = tokeniz.nextToken();
+ int index = value.indexOf("hey");
+ assertTrue(index>-1);
+ if(index>-1) {
+ String heyStr = value.substring(index);
+ assertTrue(!badRecs.contains(heyStr));
+ }
+
+ line = reader.readLine();
+ }
+ reader.close();
+ if(validateCount) {
+ assertEquals(INPUTSIZE-badRecs.size(), counter);
+ }
+ }
+ }
+
+ public void testDisableSkip() throws Exception {
+ JobConf clusterConf = createJobConf();
+ createInput();
+
+ //the no of attempts to successfully complete the task depends
+ //on the no of bad records.
+ int mapperAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
+ +1+MAPPER_BAD_RECORDS.size();
+ int reducerAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
+ +1+REDUCER_BAD_RECORDS.size();
+ String[] args = new String[] {
+ "-input", (new Path(getInputDir(), "text.txt")).toString(),
+ "-output", getOutputDir().toString(),
+ "-mapper", badMapper,
+ "-reducer", badReducer,
+ "-verbose",
+ "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
+ "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
+ "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
+ "-jobconf", "mapred.skip.mode.enabled=false",
+ "-jobconf", "mapred.map.tasks=1",
+ "-jobconf", "mapred.reduce.tasks=1",
+ "-jobconf", "mapred.task.timeout=30000",
+ "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
+ "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
+ "-jobconf", "mapred.job.tracker.http.address="
+ +clusterConf.get("mapred.job.tracker.http.address"),
+ "-jobconf", "stream.debug=set",
+ "-jobconf", "keep.failed.task.files=true",
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+ };
+ StreamJob job = new StreamJob(args, false);
+ job.go();
+ assertFalse(job.running_.isSuccessful());
+ }
+
+ public void testSkip() throws Exception {
+ JobConf clusterConf = createJobConf();
+ createInput();
+
+ //the no of attempts to successfully complete the task depends
+ //on the no of bad records.
+ int mapperAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
+ +1+MAPPER_BAD_RECORDS.size();
+ int reducerAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
+ +1+REDUCER_BAD_RECORDS.size();
+
+ String[] args = new String[] {
+ "-input", (new Path(getInputDir(), "text.txt")).toString(),
+ "-output", getOutputDir().toString(),
+ "-mapper", badMapper,
+ "-reducer", badReducer,
+ "-verbose",
+ "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
+ "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
+ "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
+ "-jobconf", "mapred.map.tasks=1",
+ "-jobconf", "mapred.reduce.tasks=1",
+ "-jobconf", "mapred.task.timeout=30000",
+ "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
+ "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
+ "-jobconf", "mapred.job.tracker.http.address="
+ +clusterConf.get("mapred.job.tracker.http.address"),
+ "-jobconf", "stream.debug=set",
+ "-jobconf", "keep.failed.task.files=true",
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+ };
+ StreamJob job = new StreamJob(args, false);
+ job.go();
+ validateOutput(job.running_, false);
+ }
+
+ static class App{
+ boolean isReducer;
+
+ public App(String[] args) throws Exception{
+ if(args.length>0) {
+ isReducer = Boolean.parseBoolean(args[0]);
+ }
+ String counter = Counters.Application.MAP_PROCESSED_RECORDS;
+ if(isReducer) {
+ counter = Counters.Application.REDUCE_PROCESSED_RECORDS;
+ }
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String line;
+ int count = 0;
+ while ((line = in.readLine()) != null) {
+ processLine(line);
+ count++;
+ if(count>=10) {
+ System.err.println("reporter:counter:"+Counters.Application.GROUP+","+
+ counter+","+count);
+ count = 0;
+ }
+ }
+ }
+
+ protected void processLine(String line) throws Exception{
+ System.out.println(line);
+ }
+
+
+ public static void main(String[] args) throws Exception{
+ new App(args);
+ }
+ }
+
+ static class BadApp extends App{
+
+ public BadApp(String[] args) throws Exception {
+ super(args);
+ }
+
+ protected void processLine(String line) throws Exception {
+ List<String> badRecords = MAPPER_BAD_RECORDS;
+ if(isReducer) {
+ badRecords = REDUCER_BAD_RECORDS;
+ }
+ if(badRecords.size()>0 && line.contains(badRecords.get(0))) {
+ LOG.warn("Encountered BAD record");
+ System.exit(-1);
+ }
+ else if(badRecords.size()>1 && line.contains(badRecords.get(1))) {
+ LOG.warn("Encountered BAD record");
+ throw new Exception("Got bad record..crashing");
+ }
+ else if(badRecords.size()>2 && line.contains(badRecords.get(2))) {
+ LOG.warn("Encountered BAD record");
+ Thread.sleep(15*60*1000);
+ }
+ super.processLine(line);
+ }
+
+ public static void main(String[] args) throws Exception{
+ new BadApp(args);
+ }
+ }
+
+
+
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java Mon Aug 11 05:03:37 2008
@@ -488,4 +488,14 @@
}
return buffer.toString();
}
+
+ public static class Application {
+ //special counters which are written by the application and are
+ //used by the framework.
+ public static final String GROUP = "ApplicationCounters";
+ public static final String MAP_PROCESSED_RECORDS = "MapProcessedRecords";
+ public static final String REDUCE_PROCESSED_RECORDS =
+ "ReduceProcessedRecords";
+
+ }
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Mon Aug 11 05:03:37 2008
@@ -44,8 +44,9 @@
* version 12 changes the counters representation for HADOOP-1915
* version 13 added call getBuildVersion() for HADOOP-236
* Version 14: replaced getFilesystemName with getSystemDir for HADOOP-3135
+ * Version 15: Changed format of Task and TaskStatus for HADOOP-153
*/
- public static final long versionID = 14L;
+ public static final long versionID = 15L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Mon Aug 11 05:03:37 2008
@@ -91,6 +91,11 @@
int fromEventId, int maxLocs) throws IOException {
return TaskCompletionEvent.EMPTY_ARRAY;
}
+
+ public void reportNextRecordRange(TaskAttemptID taskid,
+ SortedRanges.Range range) throws IOException {
+ LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
+ }
}
private static ClassLoader makeClassLoader(JobConf conf,
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Mon Aug 11 05:03:37 2008
@@ -277,6 +277,11 @@
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
// Ignore for now
}
+
+ public void reportNextRecordRange(TaskAttemptID taskid,
+ SortedRanges.Range range) throws IOException {
+ LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
+ }
public boolean ping(TaskAttemptID taskid) throws IOException {
return true;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java Mon Aug 11 05:03:37 2008
@@ -27,10 +27,13 @@
implements MapRunnable<K1, V1, K2, V2> {
private Mapper<K1, V1, K2, V2> mapper;
+ private boolean incrProcCount;
@SuppressWarnings("unchecked")
public void configure(JobConf job) {
this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
+ this.incrProcCount = job.getBoolean("mapred.skip.on", false) &&
+ SkipBadRecords.getAutoIncrMapperProcCount(job);
}
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
@@ -44,6 +47,10 @@
while (input.next(key, value)) {
// map pair to output
mapper.map(key, value, output, reporter);
+ if(incrProcCount) {
+ reporter.incrCounter(Counters.Application.GROUP,
+ Counters.Application.MAP_PROCESSED_RECORDS, 1);
+ }
}
} finally {
mapper.close();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Mon Aug 11 05:03:37 2008
@@ -31,6 +31,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -142,11 +143,19 @@
private RecordReader<K,V> rawIn;
private Counters.Counter inputByteCounter;
private Counters.Counter inputRecordCounter;
+ private Iterator<Long> skipFailedRecIndexIterator;
+ private TaskUmbilicalProtocol umbilical;
+ private long recIndex = -1;
+ private long beforePos = -1;
+ private long afterPos = -1;
- TrackedRecordReader(RecordReader<K,V> raw, Counters counters) {
+ TrackedRecordReader(RecordReader<K,V> raw, Counters counters,
+ TaskUmbilicalProtocol umbilical) {
rawIn = raw;
+ this.umbilical = umbilical;
inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS);
inputByteCounter = counters.findCounter(MAP_INPUT_BYTES);
+ skipFailedRecIndexIterator = getFailedRanges().skipRangeIterator();
}
public K createKey() {
@@ -158,17 +167,34 @@
}
public synchronized boolean next(K key, V value)
- throws IOException {
-
- setProgress(getProgress());
- long beforePos = getPos();
- boolean ret = rawIn.next(key, value);
+ throws IOException {
+ boolean ret = moveToNext(key, value);
+ if(isSkipping() && ret) {
+ long nextRecIndex = skipFailedRecIndexIterator.next();
+ long skip = nextRecIndex - recIndex;
+ for(int i=0;i<skip && ret;i++) {
+ ret = moveToNext(key, value);
+ }
+ getCounters().incrCounter(Counter.MAP_SKIPPED_RECORDS, skip);
+ reportNextRecordRange(umbilical, nextRecIndex);
+ }
if (ret) {
inputRecordCounter.increment(1);
- inputByteCounter.increment(getPos() - beforePos);
+ inputByteCounter.increment(afterPos - beforePos);
}
return ret;
}
+
+ private synchronized boolean moveToNext(K key, V value)
+ throws IOException {
+ setProgress(getProgress());
+ beforePos = getPos();
+ boolean ret = rawIn.next(key, value);
+ recIndex++;
+ afterPos = getPos();
+ return ret;
+ }
+
public long getPos() throws IOException { return rawIn.getPos(); }
public void close() throws IOException { rawIn.close(); }
public float getProgress() throws IOException {
@@ -218,7 +244,8 @@
RecordReader rawIn = // open input
job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
- RecordReader in = new TrackedRecordReader(rawIn, getCounters());
+ RecordReader in = new TrackedRecordReader(rawIn, getCounters(), umbilical);
+ job.setBoolean("mapred.skip.on", isSkipping());
MapRunnable runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Mon Aug 11 05:03:37 2008
@@ -241,10 +241,45 @@
}
}
+ private class SkippingReduceValuesIterator<KEY,VALUE>
+ extends ReduceValuesIterator<KEY,VALUE> {
+ private Iterator<Long> skipFailedRecIndexIterator;
+ private TaskUmbilicalProtocol umbilical;
+ private long recIndex = -1;
+
+ public SkippingReduceValuesIterator(RawKeyValueIterator in,
+ RawComparator<KEY> comparator, Class<KEY> keyClass,
+ Class<VALUE> valClass, Configuration conf, Progressable reporter,
+ TaskUmbilicalProtocol umbilical) throws IOException {
+ super(in, comparator, keyClass, valClass, conf, reporter);
+ this.umbilical = umbilical;
+ skipFailedRecIndexIterator = getFailedRanges().skipRangeIterator();
+ mayBeSkip();
+ }
+
+ void nextKey() throws IOException {
+ super.nextKey();
+ mayBeSkip();
+ }
+
+ private void mayBeSkip() throws IOException {
+ recIndex++;
+ long nextRecIndex = skipFailedRecIndexIterator.next();
+ long skip = nextRecIndex - recIndex;
+ for(int i=0;i<skip && super.more();i++) {
+ super.nextKey();
+ recIndex++;
+ }
+ getCounters().incrCounter(Counter.REDUCE_SKIPPED_RECORDS, skip);
+ reportNextRecordRange(umbilical, nextRecIndex);
+ }
+ }
+
@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
+ job.setBoolean("mapred.skip.on", isSkipping());
Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
// start thread that will handle communication with parent
@@ -314,14 +349,24 @@
try {
Class keyClass = job.getMapOutputKeyClass();
Class valClass = job.getMapOutputValueClass();
+ boolean incrProcCount = isSkipping() &&
+ SkipBadRecords.getAutoIncrReducerProcCount(job);
- ReduceValuesIterator values = new ReduceValuesIterator(rIter,
+ ReduceValuesIterator values = isSkipping() ?
+ new SkippingReduceValuesIterator(rIter,
+ job.getOutputValueGroupingComparator(), keyClass, valClass,
+ job, reporter, umbilical) :
+ new ReduceValuesIterator(rIter,
job.getOutputValueGroupingComparator(), keyClass, valClass,
job, reporter);
values.informReduceProgress();
while (values.more()) {
reduceInputKeyCounter.increment(1);
reducer.reduce(values.getKey(), values, collector, reporter);
+ if(incrProcCount) {
+ reporter.incrCounter(Counters.Application.GROUP,
+ Counters.Application.REDUCE_PROCESSED_RECORDS, 1);
+ }
values.nextKey();
values.informReduceProgress();
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java?rev=684731&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java Mon Aug 11 05:03:37 2008
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Utility class for skip bad records functionality. It contains various
+ * settings related to skipping of bad records.
+ */
+public class SkipBadRecords {
+
+ private static final String ENABLED = "mapred.skip.mode.enabled";
+ private static final String ATTEMPTS_TO_START_SKIPPING =
+ "mapred.skip.attempts.to.start.skipping";
+ private static final String AUTO_INCR_MAP_PROC_COUNT =
+ "mapred.skip.map.auto.incr.proc.count";
+ private static final String AUTO_INCR_REDUCE_PROC_COUNT =
+ "mapred.skip.reduce.auto.incr.proc.count";
+
+ /**
+ * Is skipping of bad records enabled. If it is enabled
+ * the framework will try to find bad records and skip
+ * them on further attempts.
+ *
+ * @param conf the configuration
+ * @return <code>true</code> if skipping is enabled
+ * <code>false</code> otherwise.
+ */
+ public static boolean getEnabled(Configuration conf) {
+ return conf.getBoolean(ENABLED, true);
+ }
+
+ /**
+ * Set whether to enable skipping of bad records. If it is enabled
+ * the framework will try to find bad records and will
+ * try to skip them on further attempts.
+ *
+ * @param conf the configuration
+ * @param enabled boolean to enable/disable skipping
+ */
+ public static void setEnabled(Configuration conf, boolean enabled) {
+ conf.setBoolean(ENABLED, enabled);
+ }
+
+ /**
+ * Get the number of Task attempts AFTER which skip mode
+ * will be kicked off. When skip mode is kicked off, the
+ * tasks reports the range of records which it will process
+ * next to the TaskTracker. So that on failures, TT knows which
+ * ones are possibly the bad records. On further executions,
+ * those are skipped.
+ *
+ * @param conf the configuration
+ * @return attemptsToStartSkipping no of task attempts
+ */
+ public static int getAttemptsToStartSkipping(Configuration conf) {
+ return conf.getInt(ATTEMPTS_TO_START_SKIPPING, 2);
+ }
+
+ /**
+ * Set the number of Task attempts AFTER which skip mode
+ * will be kicked off. When skip mode is kicked off, the
+ * tasks reports the range of records which it will process
+ * next to the TaskTracker. So that on failures, TT knows which
+ * ones are possibly the bad records. On further executions,
+ * those are skipped.
+ *
+ * @param conf the configuration
+ * @param attemptsToStartSkipping no of task attempts
+ */
+ public static void setAttemptsToStartSkipping(Configuration conf,
+ int attemptsToStartSkipping) {
+ conf.setInt(ATTEMPTS_TO_START_SKIPPING, attemptsToStartSkipping);
+ }
+
+ /**
+ * Get the flag which if set to true,
+ * Counters.Application.MAP_PROCESSED_RECORDS is incremented
+ * by MapRunner after invoking the map function. This value must be set to
+ * false for applications which process the records asynchronously
+ * or buffer the input records. For example streaming.
+ * In such cases applications should increment this counter on their own.
+ *
+ * @param conf the configuration
+ * @return <code>true</code> if auto increment
+ * Counters.Application.MAP_PROCESSED_RECORDS.
+ * <code>false</code> otherwise.
+ */
+ public static boolean getAutoIncrMapperProcCount(Configuration conf) {
+ return conf.getBoolean(AUTO_INCR_MAP_PROC_COUNT, true);
+ }
+
+ /**
+ * Set the flag which if set to true,
+ * Counters.Application.MAP_PROCESSED_RECORDS is incremented
+ * by MapRunner after invoking the map function. This value must be set to
+ * false for applications which process the records asynchronously
+ * or buffer the input records. For example streaming.
+ * In such cases applications should increment this counter on their own.
+ *
+ * @param conf the configuration
+ * @param autoIncr whether to auto increment
+ * Counters.Application.MAP_PROCESSED_RECORDS.
+ */
+ public static void setAutoIncrMapperProcCount(Configuration conf,
+ boolean autoIncr) {
+ conf.setBoolean(AUTO_INCR_MAP_PROC_COUNT, autoIncr);
+ }
+
+ /**
+ * Get the flag which if set to true,
+ * Counters.Application.REDUCE_PROCESSED_RECORDS is incremented
+ * by framework after invoking the reduce function. This value must be set to
+ * false for applications which process the records asynchronously
+ * or buffer the input records. For example streaming.
+ * In such cases applications should increment this counter on their own.
+ *
+ * @param conf the configuration
+ * @return <code>true</code> if auto increment
+ * Counters.Application.REDUCE_PROCESSED_RECORDS.
+ * <code>false</code> otherwise.
+ */
+ public static boolean getAutoIncrReducerProcCount(Configuration conf) {
+ return conf.getBoolean(AUTO_INCR_REDUCE_PROC_COUNT, true);
+ }
+
+ /**
+ * Set the flag which if set to true,
+ * Counters.Application.REDUCE_PROCESSED_RECORDS is incremented
+ * by framework after invoking the reduce function. This value must be set to
+ * false for applications which process the records asynchronously
+ * or buffer the input records. For example streaming.
+ * In such cases applications should increment this counter on their own.
+ *
+ * @param conf the configuration
+ * @param autoIncr whether to auto increment
+ * Counters.Application.REDUCE_PROCESSED_RECORDS.
+ */
+ public static void setAutoIncrReducerProcCount(Configuration conf,
+ boolean autoIncr) {
+ conf.setBoolean(AUTO_INCR_REDUCE_PROC_COUNT, autoIncr);
+ }
+
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java?rev=684731&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java Mon Aug 11 05:03:37 2008
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Keeps the Ranges sorted by startIndex.
+ * The added ranges are always ensured to be non-overlapping.
+ * Provides the SkipRangeIterator, which skips the Ranges
+ * stored in this object.
+ */
+public class SortedRanges implements Writable{
+
+ private static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.mapred.SortedRanges");
+
+ private SortedSet<Range> ranges = new TreeSet<Range>();
+ private int indicesCount;
+
+ /**
+ * Get Iterator which skips the stored ranges.
+ * The Iterator.next() call return the index starting from 0.
+ * @return Iterator<Long>
+ */
+ public Iterator<Long> skipRangeIterator(){
+ return new SkipRangeIterator();
+ }
+
+ /**
+ * Get the no of indices stored in the ranges.
+ * @return indices count
+ */
+ public synchronized int getIndicesCount() {
+ return indicesCount;
+ }
+
+ /**
+ * Add the range indices. It is ensured that the added range
+ * doesn't overlap the existing ranges. If it overlaps, the
+ * existing overlapping ranges are removed and a single range
+ * having the superset of all the removed ranges and this range
+ * is added.
+ * If the range is of 0 length, doesn't do anything.
+ * @param range Range to be added.
+ */
+ public synchronized void add(Range range){
+ if(range.isEmpty()) {
+ return;
+ }
+
+ long startIndex = range.getStartIndex();
+ long endIndex = range.getEndIndex();
+ //make sure that there are no overlapping ranges
+ SortedSet<Range> headSet = ranges.headSet(range);
+ if(headSet.size()>0) {
+ Range previousRange = headSet.last();
+ LOG.debug("previousRange "+previousRange);
+ if(startIndex<previousRange.getEndIndex()) {
+ //previousRange overlaps this range
+ //remove the previousRange
+ if(ranges.remove(previousRange)) {
+ indicesCount-=previousRange.getLength();
+ }
+ //expand this range
+ startIndex = previousRange.getStartIndex();
+ endIndex = endIndex>=previousRange.getEndIndex() ?
+ endIndex : previousRange.getEndIndex();
+ }
+ }
+
+ Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
+ while(tailSetIt.hasNext()) {
+ Range nextRange = tailSetIt.next();
+ LOG.debug("nextRange "+nextRange +" startIndex:"+startIndex+
+ " endIndex:"+endIndex);
+ if(endIndex>=nextRange.getStartIndex()) {
+ //nextRange overlaps this range
+ //remove the nextRange
+ tailSetIt.remove();
+ indicesCount-=nextRange.getLength();
+ if(endIndex<nextRange.getEndIndex()) {
+ //expand this range
+ endIndex = nextRange.getEndIndex();
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ add(startIndex,endIndex);
+ }
+
+ /**
+ * Remove the range indices. If this range is
+ * found in existing ranges, the existing ranges
+ * are shrunk.
+ * If range is of 0 length, doesn't do anything.
+ * @param range Range to be removed.
+ */
+ public synchronized void remove(Range range) {
+ if(range.isEmpty()) {
+ return;
+ }
+ long startIndex = range.getStartIndex();
+ long endIndex = range.getEndIndex();
+ //make sure that there are no overlapping ranges
+ SortedSet<Range> headSet = ranges.headSet(range);
+ if(headSet.size()>0) {
+ Range previousRange = headSet.last();
+ LOG.debug("previousRange "+previousRange);
+ if(startIndex<previousRange.getEndIndex()) {
+ //previousRange overlaps this range
+ //narrow down the previousRange
+ if(ranges.remove(previousRange)) {
+ indicesCount-=previousRange.getLength();
+ LOG.debug("removed previousRange "+previousRange);
+ }
+ add(previousRange.getStartIndex(), startIndex);
+ if(endIndex<=previousRange.getEndIndex()) {
+ add(endIndex, previousRange.getEndIndex());
+ }
+ }
+ }
+
+ Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
+ while(tailSetIt.hasNext()) {
+ Range nextRange = tailSetIt.next();
+ LOG.debug("nextRange "+nextRange +" startIndex:"+startIndex+
+ " endIndex:"+endIndex);
+ if(endIndex>nextRange.getStartIndex()) {
+ //nextRange overlaps this range
+ //narrow down the nextRange
+ tailSetIt.remove();
+ indicesCount-=nextRange.getLength();
+ if(endIndex<nextRange.getEndIndex()) {
+ add(endIndex, nextRange.getEndIndex());
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ private void add(long start, long end) {
+ if(end>start) {
+ Range recRange = new Range(start, end-start);
+ ranges.add(recRange);
+ indicesCount+=recRange.getLength();
+ LOG.debug("added "+recRange);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ ranges = new TreeSet<Range>();
+ int size = in.readInt();
+ for(int i=0;i<size;i++) {
+ Range range = new Range();
+ range.readFields(in);
+ ranges.add(range);
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(ranges.size());
+ Iterator<Range> it = ranges.iterator();
+ while(it.hasNext()) {
+ Range range = it.next();
+ range.write(out);
+ }
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ Iterator<Range> it = ranges.iterator();
+ while(it.hasNext()) {
+ Range range = it.next();
+ sb.append(range.toString()+"\n");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Index Range. Comprises of start index and length.
+ * A Range can be of 0 length also. The Range stores indices
+ * of type long.
+ */
+ static class Range implements Comparable<Range>, Writable{
+ private long startIndex;
+ private long length;
+
+ public Range(long startIndex, long length) {
+ if(length<0) {
+ throw new RuntimeException("length can't be negative");
+ }
+ this.startIndex = startIndex;
+ this.length = length;
+ }
+
+ public Range() {
+ this(0,0);
+ }
+
+ /**
+ * Get the start index. Start index in inclusive.
+ * @return startIndex.
+ */
+ public long getStartIndex() {
+ return startIndex;
+ }
+
+ /**
+ * Get the end index. End index is exclusive.
+ * @return endIndex.
+ */
+ public long getEndIndex() {
+ return startIndex + length;
+ }
+
+ /**
+ * Get Length.
+ * @return length
+ */
+ public long getLength() {
+ return length;
+ }
+
+ /**
+ * Range is empty if its length is zero.
+ * @return <code>true</code> if empty
+ * <code>false</code> otherwise.
+ */
+ public boolean isEmpty() {
+ return length==0;
+ }
+
+ public boolean equals(Object o) {
+ if(o!=null && o instanceof Range) {
+ Range range = (Range)o;
+ return startIndex==range.startIndex &&
+ length==range.length;
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ return Long.valueOf(startIndex).hashCode() +
+ Long.valueOf(length).hashCode();
+ }
+
+ public int compareTo(Range o) {
+ if(this.equals(o)) {
+ return 0;
+ }
+ return (this.startIndex > o.startIndex) ? 1:-1;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ startIndex = in.readLong();
+ length = in.readLong();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(startIndex);
+ out.writeLong(length);
+ }
+
+ public String toString() {
+ return startIndex +":" + length;
+ }
+ }
+
+ /**
+ * Index Iterator which skips the stored ranges.
+ */
+ private class SkipRangeIterator implements Iterator<Long> {
+ Iterator<Range> rangeIterator = ranges.iterator();
+ Range range = new Range();
+ long currentIndex = -1;
+
+ /**
+ * Returns true till the index reaches Long.MAX_VALUE.
+ * @return <code>true</code> next index exists.
+ * <code>false</code> otherwise.
+ */
+ public boolean hasNext() {
+ return currentIndex<Long.MAX_VALUE;
+ }
+
+ /**
+ * Get the next available index. The index starts from 0.
+ * @return next index
+ */
+ public synchronized Long next() {
+ currentIndex++;
+ LOG.debug("currentIndex "+currentIndex +" "+range);
+ skipIfInRange();
+ while(currentIndex>=range.getEndIndex() && rangeIterator.hasNext()) {
+ range = rangeIterator.next();
+ skipIfInRange();
+ }
+ return currentIndex;
+ }
+
+ private void skipIfInRange() {
+ if(currentIndex>=range.getStartIndex() &&
+ currentIndex<range.getEndIndex()) {
+ //need to skip the range
+ LOG.warn("Skipping index " + currentIndex +"-" + range.getEndIndex());
+ currentIndex = range.getEndIndex();
+
+ }
+ }
+
+ /**
+ * Remove is not supported. Doesn't apply.
+ */
+ public void remove() {
+ throw new UnsupportedOperationException("remove not supported.");
+ }
+
+ }
+
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Mon Aug 11 05:03:37 2008
@@ -64,13 +64,15 @@
protected static enum Counter {
MAP_INPUT_RECORDS,
MAP_OUTPUT_RECORDS,
+ MAP_SKIPPED_RECORDS,
MAP_INPUT_BYTES,
MAP_OUTPUT_BYTES,
COMBINE_INPUT_RECORDS,
COMBINE_OUTPUT_RECORDS,
REDUCE_INPUT_GROUPS,
REDUCE_INPUT_RECORDS,
- REDUCE_OUTPUT_RECORDS
+ REDUCE_OUTPUT_RECORDS,
+ REDUCE_SKIPPED_RECORDS
}
/**
@@ -109,6 +111,15 @@
TaskStatus taskStatus; // current status of the task
private Path taskOutputPath; // task-specific output dir
+ //failed ranges from previous attempts
+ private SortedRanges failedRanges = new SortedRanges();
+ private boolean skipping = false;
+
+ //currently processing record start index
+ private volatile long currentRecStartIndex;
+ private Iterator<Long> currentRecIndexIterator =
+ failedRanges.skipRangeIterator();
+
protected JobConf conf;
protected MapOutputFile mapOutputFile = new MapOutputFile();
protected LocalDirAllocator lDirAlloc;
@@ -175,6 +186,37 @@
protected synchronized void setPhase(TaskStatus.Phase phase){
this.taskStatus.setPhase(phase);
}
+
+ /**
+ * Get failed ranges.
+ * @return recordsToSkip
+ */
+ public SortedRanges getFailedRanges() {
+ return failedRanges;
+ }
+
+ /**
+ * Set failed ranges.
+ * @param recordsToSkip
+ */
+ public void setFailedRanges(SortedRanges failedRanges) {
+ this.failedRanges = failedRanges;
+ }
+
+ /**
+ * Is Task in skipping mode.
+ */
+ public boolean isSkipping() {
+ return skipping;
+ }
+
+ /**
+ * Sets whether to run Task in skipping mode.
+ * @param skipping
+ */
+ public void setSkipping(boolean skipping) {
+ this.skipping = skipping;
+ }
////////////////////////////////////////////
// Writable methods
@@ -190,6 +232,8 @@
Text.writeString(out, "");
}
taskStatus.write(out);
+ failedRanges.write(out);
+ out.writeBoolean(skipping);
}
public void readFields(DataInput in) throws IOException {
jobFile = Text.readString(in);
@@ -203,6 +247,10 @@
}
taskStatus.readFields(in);
this.mapOutputFile.setJobId(taskId.getJobID());
+ failedRanges.readFields(in);
+ currentRecIndexIterator = failedRanges.skipRangeIterator();
+ currentRecStartIndex = currentRecIndexIterator.next();
+ skipping = in.readBoolean();
}
@Override
@@ -370,6 +418,17 @@
if (counters != null) {
counters.incrCounter(group, counter, amount);
}
+ if(skipping && Counters.Application.GROUP.equals(group) && (
+ Counters.Application.MAP_PROCESSED_RECORDS.equals(counter) ||
+ Counters.Application.REDUCE_PROCESSED_RECORDS.equals(counter))) {
+ //if application reports the processed records, move the
+ //currentRecStartIndex to the next.
+ //currentRecStartIndex is the start index which has not yet been
+ //finished and is still in task's stomach.
+ for(int i=0;i<amount;i++) {
+ currentRecStartIndex = currentRecIndexIterator.next();
+ }
+ }
setProgressFlag();
}
public InputSplit getInputSplit() throws UnsupportedOperationException {
@@ -377,6 +436,25 @@
}
};
}
+
+ /**
+ * Reports the next executing record range to TaskTracker.
+ *
+ * @param umbilical
+ * @param nextRecIndex the record index which would be fed next.
+ * @throws IOException
+ */
+ protected void reportNextRecordRange(final TaskUmbilicalProtocol umbilical,
+ long nextRecIndex) throws IOException{
+ //currentRecStartIndex is the start index which has not yet been finished
+ //and is still in task's stomach.
+ long len = nextRecIndex - currentRecStartIndex +1;
+ SortedRanges.Range range =
+ new SortedRanges.Range(currentRecStartIndex, len);
+ taskStatus.setNextRecordRange(range);
+ LOG.debug("sending reportNextRecordRange " + range);
+ umbilical.reportNextRecordRange(taskId, range);
+ }
public void setProgress(float progress) {
taskProgress.set(progress);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Mon Aug 11 05:03:37 2008
@@ -77,6 +77,8 @@
private int completes = 0;
private boolean failed = false;
private boolean killed = false;
+ private volatile SortedRanges failedRanges = new SortedRanges();
+ private volatile boolean skipping = false;
// The 'next' usable taskid of this tip
int nextTaskId = 0;
@@ -485,6 +487,12 @@
if (taskState == TaskStatus.State.FAILED) {
numTaskFailures++;
machinesWhereFailed.add(trackerHostName);
+ LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
+ failedRanges.add(status.getNextRecordRange());
+ if(SkipBadRecords.getEnabled(conf) &&
+ numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
+ skipping = true;
+ }
} else {
numKilledTasks++;
}
@@ -677,7 +685,7 @@
// in more depth eventually...
//
- if (activeTasks.size() <= MAX_TASK_EXECS &&
+ if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
(averageProgress - progress >= SPECULATIVE_GAP) &&
(currentTime - startTime >= SPECULATIVE_LAG)
&& completes == 0 && !isOnlyCommitPending()) {
@@ -709,12 +717,16 @@
}
if (isMapTask()) {
+ LOG.debug("attemdpt "+ numTaskFailures +
+ " sending skippedRecords "+failedRanges.getIndicesCount());
t = new MapTask(jobFile, taskid, partition,
rawSplit.getClassName(), rawSplit.getBytes());
} else {
t = new ReduceTask(jobFile, taskid, partition, numMaps);
}
t.setConf(conf);
+ t.setFailedRanges(failedRanges);
+ t.setSkipping(skipping);
tasks.put(taskid, t);
activeTasks.put(taskid, taskTracker);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Mon Aug 11 05:03:37 2008
@@ -56,6 +56,7 @@
private Phase phase = Phase.STARTING;
private Counters counters;
private boolean includeCounters;
+ private SortedRanges.Range nextRecordRange = new SortedRanges.Range();
public TaskStatus() {}
@@ -89,6 +90,23 @@
}
public String getStateString() { return stateString; }
public void setStateString(String stateString) { this.stateString = stateString; }
+
+ /**
+ * Get the next record range which is going to be processed by Task.
+ * @return nextRecordRange
+ */
+ public SortedRanges.Range getNextRecordRange() {
+ return nextRecordRange;
+ }
+
+ /**
+ * Set the next record range which is going to be processed by Task.
+ * @param nextRecordRange
+ */
+ public void setNextRecordRange(SortedRanges.Range nextRecordRange) {
+ this.nextRecordRange = nextRecordRange;
+ }
+
/**
* Get task finish time. if shuffleFinishTime and sortFinishTime
* are not set before, these are set to finishTime. It takes care of
@@ -247,6 +265,7 @@
this.progress = status.getProgress();
this.runState = status.getRunState();
this.stateString = status.getStateString();
+ this.nextRecordRange = status.getNextRecordRange();
setDiagnosticInfo(status.getDiagnosticInfo());
@@ -297,6 +316,7 @@
if (includeCounters) {
counters.write(out);
}
+ nextRecordRange.write(out);
}
public void readFields(DataInput in) throws IOException {
@@ -313,6 +333,7 @@
if (includeCounters) {
counters.readFields(in);
}
+ nextRecordRange.readFields(in);
}
//////////////////////////////////////////////////////////////////////////////
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Mon Aug 11 05:03:37 2008
@@ -1574,6 +1574,10 @@
public synchronized void reportDiagnosticInfo(String info) {
this.diagnosticInfo.append(info);
}
+
+ public synchronized void reportNextRecordRange(SortedRanges.Range range) {
+ this.taskStatus.setNextRecordRange(range);
+ }
/**
* The task is reporting that it's done running
@@ -1977,6 +1981,17 @@
LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
}
}
+
+ public synchronized void reportNextRecordRange(TaskAttemptID taskid,
+ SortedRanges.Range range) throws IOException {
+ TaskInProgress tip = tasks.get(taskid);
+ if (tip != null) {
+ tip.reportNextRecordRange(range);
+ } else {
+ LOG.warn("reportNextRecordRange from unknown child task: "+taskid+". " +
+ "Ignored.");
+ }
+ }
/** Child checking to see if we're alive. Normally does nothing.*/
public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Mon Aug 11 05:03:37 2008
@@ -43,9 +43,11 @@
* Version 8 changes {job|tip|task}id's to use their corresponding
* objects rather than strings.
* Version 9 changes the counter representation for HADOOP-1915
+ * Version 10 changed the TaskStatus format and added reportNextRecordRange
+ * for HADOOP-153
* */
- public static final long versionID = 9L;
+ public static final long versionID = 10L;
/** Called when a child task process starts, to get its task.*/
Task getTask(TaskAttemptID taskid) throws IOException;
@@ -68,6 +70,15 @@
* @param trace the text to report
*/
void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException;
+
+ /**
+ * Report the record range which is going to process next by the Task.
+ * @param taskid the id of the task involved
+ * @param range the range of record sequence nos
+ * @throws IOException
+ */
+ void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
+ throws IOException;
/** Periodically called by child to check if parent is still alive.
* @return True if the task is known
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Mon Aug 11 05:03:37 2008
@@ -6,10 +6,12 @@
MAP_INPUT_BYTES.name= Map input bytes
MAP_OUTPUT_RECORDS.name= Map output records
MAP_OUTPUT_BYTES.name= Map output bytes
+MAP_SKIPPED_RECORDS.name= Map skipped records
COMBINE_INPUT_RECORDS.name= Combine input records
COMBINE_OUTPUT_RECORDS.name= Combine output records
REDUCE_INPUT_GROUPS.name= Reduce input groups
REDUCE_INPUT_RECORDS.name= Reduce input records
REDUCE_OUTPUT_RECORDS.name= Reduce output records
+REDUCE_SKIPPED_RECORDS.name= Reduce skipped records
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Mon Aug 11 05:03:37 2008
@@ -19,12 +19,14 @@
package org.apache.hadoop.mapred.lib;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,6 +60,7 @@
private ExecutorService executorService;
private volatile IOException ioException;
private volatile RuntimeException runtimeException;
+ private boolean incrProcCount;
@SuppressWarnings("unchecked")
public void configure(JobConf jobConf) {
@@ -69,6 +72,8 @@
}
this.job = jobConf;
+ this.incrProcCount = job.getBoolean("mapred.skip.on", false) &&
+ SkipBadRecords.getAutoIncrMapperProcCount(job);
this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
jobConf);
@@ -222,6 +227,10 @@
try {
// map pair to output
MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
+ if(incrProcCount) {
+ reporter.incrCounter(Counters.Application.GROUP,
+ Counters.Application.MAP_PROCESSED_RECORDS, 1);
+ }
} catch (IOException ex) {
// If there is an IOException during the call it is set in an instance
// variable of the MultithreadedMapRunner from where it will be
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Mon Aug 11 05:03:37 2008
@@ -27,6 +27,7 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SkipBadRecords;
/**
* An adaptor to run a C++ mapper.
@@ -42,6 +43,9 @@
*/
public void configure(JobConf job) {
this.job = job;
+ //disable the auto increment of the counter. For pipes, no of processed
+ //records could be different(equal or less) than the no of records input.
+ SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
/**
@@ -65,6 +69,7 @@
boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
downlink.runMap(reporter.getInputSplit(),
job.getNumReduceTasks(), isJavaInput);
+ boolean skipping = job.getBoolean("mapred.skip.on", false);
try {
if (isJavaInput) {
// allocate key & value instances that are re-used for all entries
@@ -76,6 +81,11 @@
while (input.next(key, value)) {
// map pair to output
downlink.mapItem(key, value);
+ if(skipping) {
+ //flush the streams on every record input if running in skip mode
+ //so that we don't buffer other records surrounding a bad record.
+ downlink.flush();
+ }
}
downlink.endOfInput();
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java?rev=684731&r1=684730&r2=684731&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java Mon Aug 11 05:03:37 2008
@@ -26,6 +26,7 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SkipBadRecords;
import java.io.IOException;
import java.util.Iterator;
@@ -41,9 +42,14 @@
private Application<K2, V2, K3, V3> application = null;
private DownwardProtocol<K2, V2> downlink = null;
private boolean isOk = true;
+ private boolean skipping = false;
public void configure(JobConf job) {
this.job = job;
+ //disable the auto increment of the counter. For pipes, no of processed
+ //records could be different(equal or less) than the no of records input.
+ SkipBadRecords.setAutoIncrReducerProcCount(job, false);
+ skipping = job.getBoolean("mapred.skip.on", false);
}
/**
@@ -59,6 +65,11 @@
while (values.hasNext()) {
downlink.reduceValue(values.next());
}
+ if(skipping) {
+ //flush the streams on every record input if running in skip mode
+ //so that we don't buffer other records surrounding a bad record.
+ downlink.flush();
+ }
isOk = true;
}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java?rev=684731&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java Mon Aug 11 05:03:37 2008
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+public class TestBadRecords extends ClusterMapReduceTestCase {
+
+ private static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.mapred.TestBadRecords");
+
+ private static final List<String> MAPPER_BAD_RECORDS =
+ Arrays.asList("hello01","hello04","hello05");
+
+ private static final List<String> REDUCER_BAD_RECORDS =
+ Arrays.asList("hello08","hello10");
+
+ private List<String> input;
+
+ public TestBadRecords() {
+ input = new ArrayList<String>();
+ for(int i=1;i<=10;i++) {
+ String str = ""+i;
+ int zerosToPrepend = 2 - str.length();
+ for(int j=0;j<zerosToPrepend;j++){
+ str = "0"+str;
+ }
+ input.add("hello"+str);
+ }
+ }
+
+ private void runMapReduce(JobConf conf,
+ List<String> mapperBadRecords, List<String> redBadRecords)
+ throws Exception {
+ createInput();
+ conf.setJobName("mr");
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(1);
+ conf.setInt("mapred.task.timeout", 30*1000);
+
+ //the no of attempts to successfully complete the task depends
+ //on the no of bad records.
+ conf.setMaxMapAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+1+
+ mapperBadRecords.size());
+ conf.setMaxReduceAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+
+ 1+redBadRecords.size());
+
+ FileInputFormat.setInputPaths(conf, getInputDir());
+ FileOutputFormat.setOutputPath(conf, getOutputDir());
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setMapOutputKeyClass(LongWritable.class);
+ conf.setMapOutputValueClass(Text.class);
+ conf.setOutputFormat(TextOutputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+ RunningJob runningJob = JobClient.runJob(conf);
+ validateOutput(conf, runningJob, mapperBadRecords, redBadRecords);
+ }
+
+
+ private void createInput() throws Exception {
+ OutputStream os = getFileSystem().create(new Path(getInputDir(),
+ "text.txt"));
+ Writer wr = new OutputStreamWriter(os);
+ for(String inp : input) {
+ wr.write(inp+"\n");
+ }wr.close();
+ }
+
+ private void validateOutput(JobConf conf, RunningJob runningJob,
+ List<String> mapperBadRecords, List<String> redBadRecords)
+ throws Exception{
+ LOG.info(runningJob.getCounters().toString());
+ assertTrue(runningJob.isSuccessful());
+ Path[] outputFiles = FileUtil.stat2Paths(
+ getFileSystem().listStatus(getOutputDir(),
+ new OutputLogFilter()));
+
+ List<String> mapperOutput=getProcessed(input, mapperBadRecords);
+ LOG.debug("mapperOutput " + mapperOutput.size());
+ List<String> reducerOutput=getProcessed(mapperOutput, redBadRecords);
+ LOG.debug("reducerOutput " + reducerOutput.size());
+
+ if (outputFiles.length > 0) {
+ InputStream is = getFileSystem().open(outputFiles[0]);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ String line = reader.readLine();
+ int counter = 0;
+ while (line != null) {
+ counter++;
+ StringTokenizer tokeniz = new StringTokenizer(line, "\t");
+ String key = tokeniz.nextToken();
+ String value = tokeniz.nextToken();
+ LOG.debug("Output: key:"+key + " value:"+value);
+ assertTrue(value.contains("hello"));
+
+
+ assertTrue(reducerOutput.contains(value));
+ line = reader.readLine();
+ }
+ reader.close();
+ assertEquals(reducerOutput.size(), counter);
+ }
+ }
+
+ private List<String> getProcessed(List<String> inputs, List<String> badRecs) {
+ List<String> processed = new ArrayList<String>();
+ for(String input : inputs) {
+ if(!badRecs.contains(input)) {
+ processed.add(input);
+ }
+ }
+ return processed;
+ }
+
+ public void testBadMapRed() throws Exception {
+ JobConf conf = createJobConf();
+ conf.setMapperClass(BadMapper.class);
+ conf.setReducerClass(BadReducer.class);
+ runMapReduce(conf, MAPPER_BAD_RECORDS, REDUCER_BAD_RECORDS);
+ }
+
+
+ static class BadMapper extends MapReduceBase implements
+ Mapper<LongWritable, Text, LongWritable, Text> {
+
+ public void map(LongWritable key, Text val,
+ OutputCollector<LongWritable, Text> output, Reporter reporter)
+ throws IOException {
+ String str = val.toString();
+ LOG.debug("MAP key:" +key +" value:" + str);
+ if(MAPPER_BAD_RECORDS.get(0).equals(str)) {
+ LOG.warn("MAP Encountered BAD record");
+ System.exit(-1);
+ }
+ else if(MAPPER_BAD_RECORDS.get(1).equals(str)) {
+ LOG.warn("MAP Encountered BAD record");
+ throw new RuntimeException("Bad record "+str);
+ }
+ else if(MAPPER_BAD_RECORDS.get(2).equals(str)) {
+ try {
+ LOG.warn("MAP Encountered BAD record");
+ Thread.sleep(15*60*1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ output.collect(key, val);
+ }
+ }
+
+ static class BadReducer extends MapReduceBase implements
+ Reducer<LongWritable, Text, LongWritable, Text> {
+
+ public void reduce(LongWritable key, Iterator<Text> values,
+ OutputCollector<LongWritable, Text> output, Reporter reporter)
+ throws IOException {
+ while(values.hasNext()) {
+ Text value = values.next();
+ LOG.debug("REDUCE key:" +key +" value:" + value);
+ if(REDUCER_BAD_RECORDS.get(0).equals(value.toString())) {
+ LOG.warn("REDUCE Encountered BAD record");
+ System.exit(-1);
+ }
+ else if(REDUCER_BAD_RECORDS.get(1).equals(value.toString())) {
+ try {
+ LOG.warn("REDUCE Encountered BAD record");
+ Thread.sleep(15*60*1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ output.collect(key, value);
+ }
+
+ }
+ }
+
+
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSortedRanges.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSortedRanges.java?rev=684731&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSortedRanges.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSortedRanges.java Mon Aug 11 05:03:37 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+
+public class TestSortedRanges extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.mapred.TestSortedRanges");
+
+ public void testAdd() {
+ SortedRanges sr = new SortedRanges();
+ sr.add(new Range(2,9));
+ assertEquals(9, sr.getIndicesCount());
+
+ sr.add(new SortedRanges.Range(3,5));
+ assertEquals(9, sr.getIndicesCount());
+
+ sr.add(new SortedRanges.Range(7,1));
+ assertEquals(9, sr.getIndicesCount());
+
+ sr.add(new Range(1,12));
+ assertEquals(12, sr.getIndicesCount());
+
+ sr.add(new Range(7,9));
+ assertEquals(15, sr.getIndicesCount());
+
+ sr.add(new Range(31,10));
+ sr.add(new Range(51,10));
+ sr.add(new Range(66,10));
+ assertEquals(45, sr.getIndicesCount());
+
+ sr.add(new Range(21,50));
+ assertEquals(70, sr.getIndicesCount());
+
+ LOG.debug(sr);
+
+ Iterator<Long> it = sr.skipRangeIterator();
+ int i = 0;
+ assertEquals(i, it.next().longValue());
+ for(i=16;i<21;i++) {
+ assertEquals(i, it.next().longValue());
+ }
+ assertEquals(76, it.next().longValue());
+ assertEquals(77, it.next().longValue());
+
+ }
+
+ public void testRemove() {
+ SortedRanges sr = new SortedRanges();
+ sr.add(new Range(2,19));
+ assertEquals(19, sr.getIndicesCount());
+
+ sr.remove(new SortedRanges.Range(15,8));
+ assertEquals(13, sr.getIndicesCount());
+
+ sr.remove(new SortedRanges.Range(6,5));
+ assertEquals(8, sr.getIndicesCount());
+
+ sr.remove(new SortedRanges.Range(8,4));
+ assertEquals(7, sr.getIndicesCount());
+
+ sr.add(new Range(18,5));
+ assertEquals(12, sr.getIndicesCount());
+
+ sr.add(new Range(25,1));
+ assertEquals(13, sr.getIndicesCount());
+
+ sr.remove(new SortedRanges.Range(7,24));
+ assertEquals(4, sr.getIndicesCount());
+
+ sr.remove(new SortedRanges.Range(5,1));
+ assertEquals(3, sr.getIndicesCount());
+
+ LOG.debug(sr);
+ }
+
+}