You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/02/12 11:35:57 UTC

svn commit: r1243230 - in /hadoop/common/branches/branch-1.0: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java src/mapred/org/apache/hadoop/mapred/TaskTracker.java src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java

Author: mattf
Date: Sun Feb 12 10:35:57 2012
New Revision: 1243230

URL: http://svn.apache.org/viewvc?rev=1243230&view=rev
Log:
MAPREDUCE-3184. Add a thread to the TaskTracker which monitors for spinning Jetty selector threads, and shuts down the daemon when one is detected. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java
    hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java
Modified:
    hadoop/common/branches/branch-1.0/CHANGES.txt
    hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1243230&r1=1243229&r2=1243230&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Sun Feb 12 10:35:57 2012
@@ -24,6 +24,10 @@ Release 1.0.1 - 2012.02.12
     HADOOP-8009. Create hadoop-client and hadoop-minicluster artifacts for
     downstream projects. (Alejandro Abdelnur via mattf)
 
+    MAPREDUCE-3184. Add a thread to the TaskTracker which monitors for
+    spinning Jetty selector threads, and shuts down the daemon when one is
+    detected. (todd)
+
   BUG FIXES
 
     HADOOP-7960. Port HADOOP-5203 to branch-1, build version comparison is too 

Added: hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java?rev=1243230&view=auto
==============================================================================
--- hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java (added)
+++ hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java Sun Feb 12 10:35:57 2012
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+
+/**
+ * Class that monitors for a certain class of Jetty bug known to
+ * affect TaskTrackers. In this type of bug, the Jetty selector
+ * thread starts spinning and using ~100% CPU while no actual
+ * HTTP content is being served. Given that this bug has been
+ * active in Jetty/JDK for a long time with no resolution in site,
+ * this class provides a temporary workaround.
+ * 
+ * Upon detecting the selector thread spinning, it simply exits the
+ * JVM with a Fatal message.
+ */
+class JettyBugMonitor extends Thread {
+  private final static Log LOG = LogFactory.getLog(
+      JettyBugMonitor.class);
+
+  private static final ThreadMXBean threadBean =
+    ManagementFactory.getThreadMXBean();
+
+  private static final String CHECK_ENABLED_KEY =
+    "mapred.tasktracker.jetty.cpu.check.enabled";
+  private static final boolean CHECK_ENABLED_DEFAULT = true;
+  
+  static final String CHECK_INTERVAL_KEY =
+    "mapred.tasktracker.jetty.cpu.check.interval";
+  private static final long CHECK_INTERVAL_DEFAULT = 15*1000;
+  private long checkInterval; 
+  
+  private static final String WARN_THRESHOLD_KEY =
+    "mapred.tasktracker.jetty.cpu.threshold.warn";
+  private static final float WARN_THRESHOLD_DEFAULT = 0.50f;
+  private float warnThreshold;
+
+  private static final String FATAL_THRESHOLD_KEY =
+    "mapred.tasktracker.jetty.cpu.threshold.fatal";
+  private static final float FATAL_THRESHOLD_DEFAULT = 0.90f;
+  private float fatalThreshold;
+  
+  private boolean stopping = false;
+
+  /**
+   * Create the monitoring thread.
+   * @return null if thread CPU monitoring is not supported
+   */
+  public static JettyBugMonitor create(Configuration conf) {
+    if (!conf.getBoolean(CHECK_ENABLED_KEY, CHECK_ENABLED_DEFAULT))  {
+      return null;
+    }
+    
+    if (!threadBean.isThreadCpuTimeSupported()) {
+      LOG.info("Not starting monitor for Jetty bug since thread CPU time " +
+          "measurement is not supported by this JVM");
+      return null;
+    }
+    return new JettyBugMonitor(conf);
+  }
+  
+  JettyBugMonitor(Configuration conf) {
+    setName("Monitor for Jetty bugs");
+    setDaemon(true);
+    
+    this.warnThreshold = conf.getFloat(
+        WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
+    this.fatalThreshold = conf.getFloat(
+        FATAL_THRESHOLD_KEY, FATAL_THRESHOLD_DEFAULT);
+    this.checkInterval = conf.getLong(
+        CHECK_INTERVAL_KEY, CHECK_INTERVAL_DEFAULT);
+  }
+  
+  @Override
+  public void run() {
+    try {
+      doRun();
+    } catch (InterruptedException ie) {
+      if (!stopping) {
+        LOG.warn("Jetty monitor unexpectedly interrupted", ie);
+      }
+    } catch (Throwable t) {
+      LOG.error("Jetty bug monitor failed", t);
+    }
+    LOG.debug("JettyBugMonitor shutting down");
+  }
+  
+  private void doRun() throws InterruptedException {
+    List<Long> tids = waitForJettyThreads();
+    if (tids.isEmpty()) {
+      LOG.warn("Could not locate Jetty selector threads");
+      return;
+    }
+    while (true) {
+      try {
+        monitorThreads(tids);
+      } catch (ThreadNotRunningException tnre) {
+        return;
+      }
+    }
+  }
+  
+  /**
+   * Monitor the given list of threads, summing their CPU usage.
+   * If the usage exceeds the configured threshold, aborts the JVM.
+   * @param tids thread ids to monitor
+   * @throws InterruptedException if interrupted
+   * @throws ThreadNotRunningException if one of the threads is no longer
+   *         running
+   */
+  private void monitorThreads(List<Long> tids)
+      throws InterruptedException, ThreadNotRunningException {
+    
+    long timeBefore = System.nanoTime();
+    long usageBefore = getCpuUsageNanos(tids);
+    while (true) {
+      Thread.sleep(checkInterval);
+      long usageAfter = getCpuUsageNanos(tids);
+      long timeAfter = System.nanoTime();
+
+      long delta = usageAfter - usageBefore;
+      double percentCpu = (double)delta / (timeAfter - timeBefore);
+      
+      String msg = String.format("Jetty CPU usage: %.1f%%", percentCpu * 100);
+      if (percentCpu > fatalThreshold) {
+        LOG.fatal(
+            "************************************************************\n" +
+            msg + ". This is greater than the fatal threshold " +
+            FATAL_THRESHOLD_KEY + ". Aborting JVM.\n" +
+            "************************************************************");
+        doAbort();
+      } else if (percentCpu > warnThreshold) {
+        LOG.warn(msg);
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug(msg);
+      }
+
+      usageBefore = usageAfter;
+      timeBefore = timeAfter;
+    }
+  }
+  
+  protected void doAbort() {
+    Runtime.getRuntime().exit(1);
+  }
+
+  /**
+   * Wait for jetty selector threads to start.
+   * @return the list of thread IDs
+   * @throws InterruptedException if interrupted
+   */
+  protected List<Long> waitForJettyThreads() throws InterruptedException {
+    List<Long> tids = new ArrayList<Long>();
+    int i = 0;
+    while (tids.isEmpty() & i++ < 30) {
+      Thread.sleep(1000);
+      tids = getJettyThreadIds();
+    }
+    return tids;
+  }
+
+  private static long getCpuUsageNanos(List<Long> tids)
+      throws ThreadNotRunningException {
+    long total = 0;
+    for (long tid : tids) {
+      long time = threadBean.getThreadCpuTime(tid);
+      if (time == -1) {
+        LOG.warn("Unable to monitor CPU usage for thread: " + tid);
+        throw new ThreadNotRunningException();
+      }
+      total += time;
+    }
+    return total;
+  }
+
+  static List<Long> getJettyThreadIds() {
+    List<Long> tids = new ArrayList<Long>();
+    long[] threadIds = threadBean.getAllThreadIds();
+    for (long tid : threadIds) {
+      if (isJettySelectorThread(tid)) {
+        tids.add(tid);
+      }
+    }
+    return tids;
+  }
+
+  /**
+   * @return true if the given thread ID appears to be a Jetty selector thread
+   * based on its stack trace
+   */
+  private static boolean isJettySelectorThread(long tid) {
+    ThreadInfo info = threadBean.getThreadInfo(tid, 20);
+    for (StackTraceElement stack : info.getStackTrace()) {
+      // compare class names instead of classses, since
+      // jetty uses a different classloader
+      if (SelectChannelConnector.class.getName().equals(
+          stack.getClassName())) {
+        LOG.debug("Thread #" + tid + " (" + info.getThreadName() + ") " +
+            "is a Jetty selector thread.");
+        return true;
+      }
+    }
+    LOG.debug("Thread #" + tid + " (" + info.getThreadName() + ") " +
+      "is not a jetty thread");
+    return false;
+  }
+  
+  private static class ThreadNotRunningException extends Exception {
+    private static final long serialVersionUID = 1L;
+  }
+
+  public void shutdown() {
+    this.stopping  = true;
+    this.interrupt();
+  }
+}

Modified: hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1243230&r1=1243229&r2=1243230&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Sun Feb 12 10:35:57 2012
@@ -382,6 +382,13 @@ public class TaskTracker implements MRCo
    * Handle to the specific instance of the {@link NodeHealthCheckerService}
    */
   private NodeHealthCheckerService healthChecker;
+
+  /**
+   * Thread which checks CPU usage of Jetty and shuts down the TT if it
+   * exceeds a configurable threshold.
+   */
+  private JettyBugMonitor jettyBugMonitor;
+
   
   /**
    * Configuration property for disk health check interval in milli seconds.
@@ -836,6 +843,9 @@ public class TaskTracker implements MRCo
       startHealthMonitor(this.fConf);
     }
     
+    // Start thread to monitor jetty bugs
+    startJettyBugMonitor();
+    
     oobHeartbeatOnTaskCompletion = 
       fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
     oobHeartbeatDamper = 
@@ -843,6 +853,13 @@ public class TaskTracker implements MRCo
           DEFAULT_OOB_HEARTBEAT_DAMPER);
   }
 
+  private void startJettyBugMonitor() {
+    jettyBugMonitor = JettyBugMonitor.create(fConf);
+    if (jettyBugMonitor != null) {
+      jettyBugMonitor.start();
+    }
+  }
+
   private void createInstrumentation() {
     Class<? extends TaskTrackerInstrumentation> metricsInst =
         getInstrumentationClass(fConf);
@@ -1370,6 +1387,10 @@ public class TaskTracker implements MRCo
       healthChecker.stop();
       healthChecker = null;
     }
+    if (jettyBugMonitor != null) {
+      jettyBugMonitor.shutdown();
+      jettyBugMonitor = null;
+    }
   }
 
   /**

Added: hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java?rev=1243230&view=auto
==============================================================================
--- hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java (added)
+++ hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java Sun Feb 12 10:35:57 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer;
+import org.junit.Test;
+
+
+public class TestJettyBugMonitor {
+  private final Configuration conf = new Configuration();
+  
+  /**
+   * Test that it can detect a running Jetty selector.
+   */
+  @Test(timeout=20000)
+  public void testGetJettyThreads() throws Exception {
+    JettyBugMonitor monitor = new JettyBugMonitor(conf);
+    
+    new File(System.getProperty("build.webapps", "build/webapps") + "/test"
+      ).mkdirs();
+    HttpServer server = new HttpServer("test", "0.0.0.0", 0, true);
+    server.start();
+    try {
+      List<Long> threads = monitor.waitForJettyThreads();
+      assertEquals(1, threads.size());
+    } finally {
+      server.stop();
+    }
+  }
+  
+  /**
+   * Test that the CPU monitoring can detect a spinning
+   * thread.
+   */
+  @Test(timeout=5000)
+  public void testMonitoring() throws Exception {
+    // Start a thread which sucks up CPU
+    BusyThread busyThread = new BusyThread();
+    busyThread.start();
+    final long tid = busyThread.getId();
+    // Latch which will be triggered when the jetty monitor
+    // wants to abort
+    final CountDownLatch abortLatch = new CountDownLatch(1);
+
+    conf.setLong(JettyBugMonitor.CHECK_INTERVAL_KEY, 1000);
+    JettyBugMonitor monitor = null;
+    try {
+      monitor = new JettyBugMonitor(conf) {
+        @Override
+        protected List<Long> waitForJettyThreads() {
+          return Collections.<Long>singletonList(tid);
+        }
+        @Override
+        protected void doAbort() {
+          abortLatch.countDown();
+          // signal abort to main thread
+        }
+      };
+      monitor.start();
+      
+      abortLatch.await();
+    } finally {
+      busyThread.done = true;
+      busyThread.join();
+      
+      if (monitor != null) {
+        monitor.shutdown();
+      }
+    }
+  }
+  
+  private static class BusyThread extends Thread {
+    private volatile boolean done = false;
+    
+    @Override
+    public void run() {
+      while (!done) {
+        // spin using up CPU
+      }
+    }
+  }
+}