You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2012/03/18 04:26:57 UTC

svn commit: r1302059 - in /hadoop/common/branches/branch-1.0: ./ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/ src/test/org/apache/hadoop/mapred/

Author: bobby
Date: Sun Mar 18 03:26:56 2012
New Revision: 1302059

URL: http://svn.apache.org/viewvc?rev=1302059&view=rev
Log:
svn merge -c 1302058 branch-1 to branch-1.0 FIXES MAPREDUCE-3851.  Allow more aggressive action on detection of the jetty issue (tgraves via bobby)

Added:
    hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/ShuffleExceptionTracker.java
      - copied unchanged from r1302058, hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleExceptionTracker.java
Modified:
    hadoop/common/branches/branch-1.0/   (props changed)
    hadoop/common/branches/branch-1.0/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-1.0/src/mapred/   (props changed)
    hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-1.0/src/test/findbugsExcludeFile.xml
    hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java

Propchange: hadoop/common/branches/branch-1.0/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1:r1302058

Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1302059&r1=1302058&r2=1302059&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Sun Mar 18 03:26:56 2012
@@ -48,6 +48,9 @@ Release 1.0.2 - unreleased
 
     HDFS-3101. Cannot read empty file using WebHDFS.  (szetszwo)
 
+    MAPREDUCE-3851.  Allow more aggressive action on detection of the jetty
+    issue (tgraves via bobby)
+
 Release 1.0.1 - 2012.02.14
 
   NEW FEATURES

Propchange: hadoop/common/branches/branch-1.0/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1302058

Propchange: hadoop/common/branches/branch-1.0/src/mapred/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1/src/mapred:r1302058

Modified: hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1302059&r1=1302058&r2=1302059&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Sun Mar 18 03:26:56 2012
@@ -411,6 +411,8 @@ public class TaskTracker implements MRCo
 
   private ShuffleServerInstrumentation shuffleServerMetrics;
 
+  private ShuffleExceptionTracker shuffleExceptionTracking;
+
   private TaskTrackerInstrumentation myInstrumentation = null;
 
   public TaskTrackerInstrumentation getTaskTrackerInstrumentation() {
@@ -1467,9 +1469,33 @@ public class TaskTracker implements MRCo
       conf.get("mapreduce.reduce.shuffle.catch.exception.stack.regex");
     String exceptionMsgRegex =
       conf.get("mapreduce.reduce.shuffle.catch.exception.message.regex");
+    // Percent of shuffle exceptions (out of sample size) seen before it's
+    // fatal - acceptable values are from 0 to 1.0, 0 disables the check.
+    // ie. 0.3 = 30% of the last X number of requests matched the exception,
+    // so abort.
+    float shuffleExceptionLimit =
+      conf.getFloat(
+          "mapreduce.reduce.shuffle.catch.exception.percent.limit.fatal", 0);
+    if ((shuffleExceptionLimit > 1) || (shuffleExceptionLimit < 0)) {
+      throw new IllegalArgumentException(
+          "mapreduce.reduce.shuffle.catch.exception.percent.limit.fatal "
+              + " must be between 0 and 1.0");
+    }
+
+    // The number of trailing requests we track, used for the fatal
+    // limit calculation
+    int shuffleExceptionSampleSize =
+      conf.getInt("mapreduce.reduce.shuffle.catch.exception.sample.size", 1000);
+    if (shuffleExceptionSampleSize <= 0) {
+      throw new IllegalArgumentException(
+          "mapreduce.reduce.shuffle.catch.exception.sample.size "
+              + " must be greater than 0");
+    }
+    shuffleExceptionTracking =
+      new ShuffleExceptionTracker(shuffleExceptionSampleSize, exceptionStackRegex,
+          exceptionMsgRegex, shuffleExceptionLimit );
 
-    server.setAttribute("exceptionStackRegex", exceptionStackRegex);
-    server.setAttribute("exceptionMsgRegex", exceptionMsgRegex);
+    server.setAttribute("shuffleExceptionTracking", shuffleExceptionTracking);
 
     server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
     server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
@@ -3796,10 +3822,8 @@ public class TaskTracker implements MRCo
         (ShuffleServerInstrumentation) context.getAttribute("shuffleServerMetrics");
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
-      String exceptionStackRegex =
-        (String) context.getAttribute("exceptionStackRegex");
-      String exceptionMsgRegex =
-        (String) context.getAttribute("exceptionMsgRegex");
+      ShuffleExceptionTracker shuffleExceptionTracking =
+        (ShuffleExceptionTracker) context.getAttribute("shuffleExceptionTracking");
 
       verifyRequest(request, response, tracker, jobId);
 
@@ -3912,7 +3936,9 @@ public class TaskTracker implements MRCo
                            ") failed :\n"+
                            StringUtils.stringifyException(ie));
         log.warn(errorMsg);
-        checkException(ie, exceptionMsgRegex, exceptionStackRegex, shuffleMetrics);
+        if (shuffleExceptionTracking.checkException(ie)) {
+          shuffleMetrics.exceptionsCaught();
+        }
         if (isInputException) {
           tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
         }
@@ -3933,40 +3959,10 @@ public class TaskTracker implements MRCo
         }
       }
       outStream.close();
+      shuffleExceptionTracking.success();
       shuffleMetrics.successOutput();
     }
     
-    protected void checkException(IOException ie, String exceptionMsgRegex,
-        String exceptionStackRegex, ShuffleServerInstrumentation shuffleMetrics) {
-      // parse exception to see if it looks like a regular expression you
-      // configure. If both msgRegex and StackRegex set then make sure both
-      // match, otherwise only the one set has to match.
-      if (exceptionMsgRegex != null) {
-        String msg = ie.getMessage();
-        if (msg == null || !msg.matches(exceptionMsgRegex)) {
-          return;
-        }
-      }
-      if (exceptionStackRegex != null
-          && !checkStackException(ie, exceptionStackRegex)) {
-        return;
-      }
-      shuffleMetrics.exceptionsCaught();
-    }
-
-    private boolean checkStackException(IOException ie,
-        String exceptionStackRegex) {
-      StackTraceElement[] stack = ie.getStackTrace();
-
-      for (StackTraceElement elem : stack) {
-        String stacktrace = elem.toString();
-        if (stacktrace.matches(exceptionStackRegex)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
 
     /**
      * verify that request has correct HASH for the url

Modified: hadoop/common/branches/branch-1.0/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/findbugsExcludeFile.xml?rev=1302059&r1=1302058&r2=1302059&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-1.0/src/test/findbugsExcludeFile.xml Sun Mar 18 03:26:56 2012
@@ -132,4 +132,9 @@
        <Method name="run" />
        <Bug pattern="DM_EXIT" />
     </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.ShuffleExceptionTracker" />
+       <Method name="doAbort" />
+       <Bug pattern="DM_EXIT" />
+    </Match>
 </FindBugsFilter>

Modified: hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java?rev=1302059&r1=1302058&r2=1302059&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java (original)
+++ hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java Sun Mar 18 03:26:56 2012
@@ -17,104 +17,197 @@
  */
 package org.apache.hadoop.mapred;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import static org.apache.hadoop.test.MetricsAsserts.*;
 
 import org.junit.Test;
 
 public class TestShuffleExceptionCount {
 
-  public static class TestMapOutputServlet extends TaskTracker.MapOutputServlet {
+  static boolean abortCalled = false;
+  private final float epsilon = 1e-5f;
+
+  public static class TestShuffleExceptionTracker extends ShuffleExceptionTracker {
+    private static final long serialVersionUID = 1L;
 
-    public void checkException(IOException ie, String exceptionMsgRegex,
-        String exceptionStackRegex, ShuffleServerInstrumentation shuffleMetrics) {
-      super.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-          shuffleMetrics);
+    TestShuffleExceptionTracker(int size, String exceptionStackRegex,
+        String exceptionMsgRegex, float shuffleExceptionLimit) {
+      super(size, exceptionStackRegex,
+          exceptionMsgRegex, shuffleExceptionLimit);
     }
 
+    protected void doAbort() {
+      abortCalled = true;
+  }
   }
 
   @Test
   public void testCheckException() throws IOException, InterruptedException {
-    TestMapOutputServlet testServlet = new TestMapOutputServlet();
-    JobConf conf = new JobConf();
-    conf.setUser("testuser");
-    conf.setJobName("testJob");
-    conf.setSessionId("testSession");
-
-    TaskTracker tt = new TaskTracker();
-    tt.setConf(conf);
-    ShuffleServerInstrumentation shuffleMetrics =
-      ShuffleServerInstrumentation.create(tt);
 
     // first test with only MsgRegex set but doesn't match
     String exceptionMsgRegex = "Broken pipe";
     String exceptionStackRegex = null;
+    TestShuffleExceptionTracker shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0f);
     IOException ie = new IOException("EOFException");
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    MetricsRecordBuilder rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 0, rb);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with only MsgRegex set that does match
     ie = new IOException("Broken pipe");
     exceptionStackRegex = null;
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 1, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with neither set, make sure incremented
     exceptionMsgRegex = null;
     exceptionStackRegex = null;
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 2, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with only StackRegex set doesn't match
     exceptionMsgRegex = null;
     exceptionStackRegex = ".*\\.doesnt\\$SelectSet\\.wakeup.*";
     ie.setStackTrace(constructStackTrace());
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 2, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with only StackRegex set does match
     exceptionMsgRegex = null;
     exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 3, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with both regex set and matches
     exceptionMsgRegex = "Broken pipe";
     ie.setStackTrace(constructStackTraceTwo());
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 4, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with both regex set and only msg matches
     exceptionStackRegex = ".*[1-9]+BOGUSREGEX";
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 4, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with both regex set and only stack matches
     exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
     exceptionMsgRegex = "EOFException";
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 4, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+
+    exceptionMsgRegex = "Broken pipe";
+    ie.setStackTrace(constructStackTraceTwo());
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+  }
+
+  @Test
+  public void testExceptionCount() {
+    String exceptionMsgRegex = "Broken pipe";
+    String exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    IOException ie = new IOException("Broken pipe");
+    ie.setStackTrace(constructStackTraceTwo());
+
+    TestShuffleExceptionTracker shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+    assertEquals("shuffleExceptionCount wrong", (float) 1 / (float) 10,
+        shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+    ie.setStackTrace(constructStackTraceThree());
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+    assertEquals("shuffleExceptionCount wrong", (float) 1 / (float) 10,
+        shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+    assertEquals("shuffleExceptionCount wrong", (float) 1 / (float) 10,
+        shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+    ie.setStackTrace(constructStackTrace());
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+    assertEquals("shuffleExceptionCount wrong", (float) 2 / (float) 10,
+        shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+    shuffleExceptionTracker.checkException(ie);
+    assertTrue("abort not called", abortCalled);
+    assertEquals("shuffleExceptionCount wrong", (float) 3 / (float) 10,
+        shuffleExceptionTracker.getPercentExceptions(), epsilon);
 
   }
 
+  @Test
+  public void testShuffleExceptionTrailing() {
+    String exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    String exceptionMsgRegex = "Broken pipe";
+    int size = 5;
+    ShuffleExceptionTracker tracker = new ShuffleExceptionTracker(
+        size, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    assertEquals(size, tracker.getNumRequests());
+    assertEquals(0, tracker.getPercentExceptions(), 0);
+    tracker.success();
+    assertEquals(0, tracker.getPercentExceptions(), 0);
+    tracker.exception();
+    assertEquals((float) 1 / (float) size, tracker.getPercentExceptions(), epsilon);
+    tracker.exception();
+    tracker.exception();
+    assertEquals((float) 3 / (float) size, tracker.getPercentExceptions(), epsilon);
+    tracker.exception();
+    tracker.exception();
+    tracker.exception();
+    tracker.exception();
+    assertEquals((float) 5 / (float) size, tracker.getPercentExceptions(), epsilon);
+    // make sure we push out old ones
+    tracker.success();
+    tracker.success();
+    assertEquals((float) 3 / (float) size, tracker.getPercentExceptions(), epsilon);
+    tracker.exception();
+    tracker.exception();
+    tracker.exception();
+    tracker.exception();
+    tracker.exception();
+    assertEquals((float) 5 / (float) size, tracker.getPercentExceptions(), epsilon);
+  }
+
+  @Test
+  public void testShuffleExceptionTrailingSize() {
+    String exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    String exceptionMsgRegex = "Broken pipe";
+    int size = 1000;
+    ShuffleExceptionTracker tracker = new ShuffleExceptionTracker(
+        size, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    assertEquals(size, tracker.getNumRequests());
+    tracker.success();
+    tracker.success();
+    tracker.exception();
+    tracker.exception();
+    assertEquals((float) 2 / (float) size, tracker.getPercentExceptions(),
+        epsilon);
+  }
+
+
   /*
    * Construction exception like:
    * java.io.IOException: Broken pipe at
@@ -174,4 +267,18 @@ public class TestShuffleExceptionCount {
     return stack;
   }
 
+  /*
+   * java.io.IOException: Broken pipe at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+   * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+   */
+  private StackTraceElement[] constructStackTraceThree() {
+    StackTraceElement[] stack = new StackTraceElement[3];
+    stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "", -2);
+    stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "EPollArrayWrapper.java", 256);
+    stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup", "EPollSelectorImpl.java", 175);
+
+    return stack;
+}
 }