You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2012/03/18 04:26:57 UTC
svn commit: r1302059 - in /hadoop/common/branches/branch-1.0: ./ src/mapred/
src/mapred/org/apache/hadoop/mapred/ src/test/
src/test/org/apache/hadoop/mapred/
Author: bobby
Date: Sun Mar 18 03:26:56 2012
New Revision: 1302059
URL: http://svn.apache.org/viewvc?rev=1302059&view=rev
Log:
svn merge -c 1302058 branch-1 to branch-1.0 FIXES MAPREDUCE-3851. Allow more aggressive action on detection of the jetty issue (tgraves via bobby)
Added:
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/ShuffleExceptionTracker.java
- copied unchanged from r1302058, hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleExceptionTracker.java
Modified:
hadoop/common/branches/branch-1.0/ (props changed)
hadoop/common/branches/branch-1.0/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-1.0/src/mapred/ (props changed)
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-1.0/src/test/findbugsExcludeFile.xml
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
Propchange: hadoop/common/branches/branch-1.0/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/branch-1:r1302058
Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1302059&r1=1302058&r2=1302059&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Sun Mar 18 03:26:56 2012
@@ -48,6 +48,9 @@ Release 1.0.2 - unreleased
HDFS-3101. Cannot read empty file using WebHDFS. (szetszwo)
+ MAPREDUCE-3851. Allow more aggressive action on detection of the jetty
+ issue (tgraves via bobby)
+
Release 1.0.1 - 2012.02.14
NEW FEATURES
Propchange: hadoop/common/branches/branch-1.0/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1302058
Propchange: hadoop/common/branches/branch-1.0/src/mapred/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/branch-1/src/mapred:r1302058
Modified: hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1302059&r1=1302058&r2=1302059&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Sun Mar 18 03:26:56 2012
@@ -411,6 +411,8 @@ public class TaskTracker implements MRCo
private ShuffleServerInstrumentation shuffleServerMetrics;
+ private ShuffleExceptionTracker shuffleExceptionTracking;
+
private TaskTrackerInstrumentation myInstrumentation = null;
public TaskTrackerInstrumentation getTaskTrackerInstrumentation() {
@@ -1467,9 +1469,33 @@ public class TaskTracker implements MRCo
conf.get("mapreduce.reduce.shuffle.catch.exception.stack.regex");
String exceptionMsgRegex =
conf.get("mapreduce.reduce.shuffle.catch.exception.message.regex");
+ // Percent of shuffle exceptions (out of sample size) seen before it's
+ // fatal - acceptable values are from 0 to 1.0, 0 disables the check.
+ // ie. 0.3 = 30% of the last X number of requests matched the exception,
+ // so abort.
+ float shuffleExceptionLimit =
+ conf.getFloat(
+ "mapreduce.reduce.shuffle.catch.exception.percent.limit.fatal", 0);
+ if ((shuffleExceptionLimit > 1) || (shuffleExceptionLimit < 0)) {
+ throw new IllegalArgumentException(
+ "mapreduce.reduce.shuffle.catch.exception.percent.limit.fatal "
+ + " must be between 0 and 1.0");
+ }
+
+ // The number of trailing requests we track, used for the fatal
+ // limit calculation
+ int shuffleExceptionSampleSize =
+ conf.getInt("mapreduce.reduce.shuffle.catch.exception.sample.size", 1000);
+ if (shuffleExceptionSampleSize <= 0) {
+ throw new IllegalArgumentException(
+ "mapreduce.reduce.shuffle.catch.exception.sample.size "
+ + " must be greater than 0");
+ }
+ shuffleExceptionTracking =
+ new ShuffleExceptionTracker(shuffleExceptionSampleSize, exceptionStackRegex,
+ exceptionMsgRegex, shuffleExceptionLimit );
- server.setAttribute("exceptionStackRegex", exceptionStackRegex);
- server.setAttribute("exceptionMsgRegex", exceptionMsgRegex);
+ server.setAttribute("shuffleExceptionTracking", shuffleExceptionTracking);
server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
@@ -3796,10 +3822,8 @@ public class TaskTracker implements MRCo
(ShuffleServerInstrumentation) context.getAttribute("shuffleServerMetrics");
TaskTracker tracker =
(TaskTracker) context.getAttribute("task.tracker");
- String exceptionStackRegex =
- (String) context.getAttribute("exceptionStackRegex");
- String exceptionMsgRegex =
- (String) context.getAttribute("exceptionMsgRegex");
+ ShuffleExceptionTracker shuffleExceptionTracking =
+ (ShuffleExceptionTracker) context.getAttribute("shuffleExceptionTracking");
verifyRequest(request, response, tracker, jobId);
@@ -3912,7 +3936,9 @@ public class TaskTracker implements MRCo
") failed :\n"+
StringUtils.stringifyException(ie));
log.warn(errorMsg);
- checkException(ie, exceptionMsgRegex, exceptionStackRegex, shuffleMetrics);
+ if (shuffleExceptionTracking.checkException(ie)) {
+ shuffleMetrics.exceptionsCaught();
+ }
if (isInputException) {
tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
}
@@ -3933,40 +3959,10 @@ public class TaskTracker implements MRCo
}
}
outStream.close();
+ shuffleExceptionTracking.success();
shuffleMetrics.successOutput();
}
- protected void checkException(IOException ie, String exceptionMsgRegex,
- String exceptionStackRegex, ShuffleServerInstrumentation shuffleMetrics) {
- // parse exception to see if it looks like a regular expression you
- // configure. If both msgRegex and StackRegex set then make sure both
- // match, otherwise only the one set has to match.
- if (exceptionMsgRegex != null) {
- String msg = ie.getMessage();
- if (msg == null || !msg.matches(exceptionMsgRegex)) {
- return;
- }
- }
- if (exceptionStackRegex != null
- && !checkStackException(ie, exceptionStackRegex)) {
- return;
- }
- shuffleMetrics.exceptionsCaught();
- }
-
- private boolean checkStackException(IOException ie,
- String exceptionStackRegex) {
- StackTraceElement[] stack = ie.getStackTrace();
-
- for (StackTraceElement elem : stack) {
- String stacktrace = elem.toString();
- if (stacktrace.matches(exceptionStackRegex)) {
- return true;
- }
- }
- return false;
- }
-
/**
* verify that request has correct HASH for the url
Modified: hadoop/common/branches/branch-1.0/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/findbugsExcludeFile.xml?rev=1302059&r1=1302058&r2=1302059&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-1.0/src/test/findbugsExcludeFile.xml Sun Mar 18 03:26:56 2012
@@ -132,4 +132,9 @@
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapred.ShuffleExceptionTracker" />
+ <Method name="doAbort" />
+ <Bug pattern="DM_EXIT" />
+ </Match>
</FindBugsFilter>
Modified: hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java?rev=1302059&r1=1302058&r2=1302059&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java (original)
+++ hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java Sun Mar 18 03:26:56 2012
@@ -17,104 +17,197 @@
*/
package org.apache.hadoop.mapred;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import static org.apache.hadoop.test.MetricsAsserts.*;
import org.junit.Test;
public class TestShuffleExceptionCount {
- public static class TestMapOutputServlet extends TaskTracker.MapOutputServlet {
+ static boolean abortCalled = false;
+ private final float epsilon = 1e-5f;
+
+ public static class TestShuffleExceptionTracker extends ShuffleExceptionTracker {
+ private static final long serialVersionUID = 1L;
- public void checkException(IOException ie, String exceptionMsgRegex,
- String exceptionStackRegex, ShuffleServerInstrumentation shuffleMetrics) {
- super.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
- shuffleMetrics);
+ TestShuffleExceptionTracker(int size, String exceptionStackRegex,
+ String exceptionMsgRegex, float shuffleExceptionLimit) {
+ super(size, exceptionStackRegex,
+ exceptionMsgRegex, shuffleExceptionLimit);
}
+ protected void doAbort() {
+ abortCalled = true;
+ }
}
@Test
public void testCheckException() throws IOException, InterruptedException {
- TestMapOutputServlet testServlet = new TestMapOutputServlet();
- JobConf conf = new JobConf();
- conf.setUser("testuser");
- conf.setJobName("testJob");
- conf.setSessionId("testSession");
-
- TaskTracker tt = new TaskTracker();
- tt.setConf(conf);
- ShuffleServerInstrumentation shuffleMetrics =
- ShuffleServerInstrumentation.create(tt);
// first test with only MsgRegex set but doesn't match
String exceptionMsgRegex = "Broken pipe";
String exceptionStackRegex = null;
+ TestShuffleExceptionTracker shuffleExceptionTracker = new TestShuffleExceptionTracker(
+ 10, exceptionStackRegex, exceptionMsgRegex, 0f);
IOException ie = new IOException("EOFException");
- testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
- shuffleMetrics);
- MetricsRecordBuilder rb = getMetrics(shuffleMetrics);
- assertCounter("shuffle_exceptions_caught", 0, rb);
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
// test with only MsgRegex set that does match
ie = new IOException("Broken pipe");
exceptionStackRegex = null;
- testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
- shuffleMetrics);
- rb = getMetrics(shuffleMetrics);
- assertCounter("shuffle_exceptions_caught", 1, rb);
+ shuffleExceptionTracker = new TestShuffleExceptionTracker(
+ 10, exceptionStackRegex, exceptionMsgRegex, 0f);
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
// test with neither set, make sure incremented
exceptionMsgRegex = null;
exceptionStackRegex = null;
- testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
- shuffleMetrics);
- rb = getMetrics(shuffleMetrics);
- assertCounter("shuffle_exceptions_caught", 2, rb);
+ shuffleExceptionTracker = new TestShuffleExceptionTracker(
+ 10, exceptionStackRegex, exceptionMsgRegex, 0f);
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
// test with only StackRegex set doesn't match
exceptionMsgRegex = null;
exceptionStackRegex = ".*\\.doesnt\\$SelectSet\\.wakeup.*";
ie.setStackTrace(constructStackTrace());
- testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
- shuffleMetrics);
- rb = getMetrics(shuffleMetrics);
- assertCounter("shuffle_exceptions_caught", 2, rb);
+ shuffleExceptionTracker = new TestShuffleExceptionTracker(
+ 10, exceptionStackRegex, exceptionMsgRegex, 0f);
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
// test with only StackRegex set does match
exceptionMsgRegex = null;
exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
- testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
- shuffleMetrics);
- rb = getMetrics(shuffleMetrics);
- assertCounter("shuffle_exceptions_caught", 3, rb);
+ shuffleExceptionTracker = new TestShuffleExceptionTracker(
+ 10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
// test with both regex set and matches
exceptionMsgRegex = "Broken pipe";
ie.setStackTrace(constructStackTraceTwo());
- testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
- shuffleMetrics);
- rb = getMetrics(shuffleMetrics);
- assertCounter("shuffle_exceptions_caught", 4, rb);
+ shuffleExceptionTracker = new TestShuffleExceptionTracker(
+ 10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
// test with both regex set and only msg matches
exceptionStackRegex = ".*[1-9]+BOGUSREGEX";
- testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
- shuffleMetrics);
- rb = getMetrics(shuffleMetrics);
- assertCounter("shuffle_exceptions_caught", 4, rb);
+ shuffleExceptionTracker = new TestShuffleExceptionTracker(
+ 10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
// test with both regex set and only stack matches
exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
exceptionMsgRegex = "EOFException";
- testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
- shuffleMetrics);
- rb = getMetrics(shuffleMetrics);
- assertCounter("shuffle_exceptions_caught", 4, rb);
+ shuffleExceptionTracker = new TestShuffleExceptionTracker(
+ 10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
+
+ exceptionMsgRegex = "Broken pipe";
+ ie.setStackTrace(constructStackTraceTwo());
+ shuffleExceptionTracker = new TestShuffleExceptionTracker(
+ 10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
+ }
+
+ @Test
+ public void testExceptionCount() {
+ String exceptionMsgRegex = "Broken pipe";
+ String exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+ IOException ie = new IOException("Broken pipe");
+ ie.setStackTrace(constructStackTraceTwo());
+
+ TestShuffleExceptionTracker shuffleExceptionTracker = new TestShuffleExceptionTracker(
+ 10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
+ assertEquals("shuffleExceptionCount wrong", (float) 1 / (float) 10,
+ shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+ ie.setStackTrace(constructStackTraceThree());
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
+ assertEquals("shuffleExceptionCount wrong", (float) 1 / (float) 10,
+ shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
+ assertEquals("shuffleExceptionCount wrong", (float) 1 / (float) 10,
+ shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+ ie.setStackTrace(constructStackTrace());
+ shuffleExceptionTracker.checkException(ie);
+ assertFalse("abort called when set to off", abortCalled);
+ assertEquals("shuffleExceptionCount wrong", (float) 2 / (float) 10,
+ shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+ shuffleExceptionTracker.checkException(ie);
+ assertTrue("abort not called", abortCalled);
+ assertEquals("shuffleExceptionCount wrong", (float) 3 / (float) 10,
+ shuffleExceptionTracker.getPercentExceptions(), epsilon);
}
+ @Test
+ public void testShuffleExceptionTrailing() {
+ String exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+ String exceptionMsgRegex = "Broken pipe";
+ int size = 5;
+ ShuffleExceptionTracker tracker = new ShuffleExceptionTracker(
+ size, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+ assertEquals(size, tracker.getNumRequests());
+ assertEquals(0, tracker.getPercentExceptions(), 0);
+ tracker.success();
+ assertEquals(0, tracker.getPercentExceptions(), 0);
+ tracker.exception();
+ assertEquals((float) 1 / (float) size, tracker.getPercentExceptions(), epsilon);
+ tracker.exception();
+ tracker.exception();
+ assertEquals((float) 3 / (float) size, tracker.getPercentExceptions(), epsilon);
+ tracker.exception();
+ tracker.exception();
+ tracker.exception();
+ tracker.exception();
+ assertEquals((float) 5 / (float) size, tracker.getPercentExceptions(), epsilon);
+ // make sure we push out old ones
+ tracker.success();
+ tracker.success();
+ assertEquals((float) 3 / (float) size, tracker.getPercentExceptions(), epsilon);
+ tracker.exception();
+ tracker.exception();
+ tracker.exception();
+ tracker.exception();
+ tracker.exception();
+ assertEquals((float) 5 / (float) size, tracker.getPercentExceptions(), epsilon);
+ }
+
+ @Test
+ public void testShuffleExceptionTrailingSize() {
+ String exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+ String exceptionMsgRegex = "Broken pipe";
+ int size = 1000;
+ ShuffleExceptionTracker tracker = new ShuffleExceptionTracker(
+ size, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+ assertEquals(size, tracker.getNumRequests());
+ tracker.success();
+ tracker.success();
+ tracker.exception();
+ tracker.exception();
+ assertEquals((float) 2 / (float) size, tracker.getPercentExceptions(),
+ epsilon);
+ }
+
+
/*
* Construction exception like:
* java.io.IOException: Broken pipe at
@@ -174,4 +267,18 @@ public class TestShuffleExceptionCount {
return stack;
}
+ /*
+ * java.io.IOException: Broken pipe at
+ * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+ * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+ * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+ */
+ private StackTraceElement[] constructStackTraceThree() {
+ StackTraceElement[] stack = new StackTraceElement[3];
+ stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "", -2);
+ stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "EPollArrayWrapper.java", 256);
+ stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup", "EPollSelectorImpl.java", 175);
+
+ return stack;
+}
}