You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/02/12 11:35:57 UTC
svn commit: r1243230 - in /hadoop/common/branches/branch-1.0: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java
src/mapred/org/apache/hadoop/mapred/TaskTracker.java
src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java
Author: mattf
Date: Sun Feb 12 10:35:57 2012
New Revision: 1243230
URL: http://svn.apache.org/viewvc?rev=1243230&view=rev
Log:
MAPREDUCE-3184. Add a thread to the TaskTracker which monitors for spinning Jetty selector threads, and shuts down the daemon when one is detected. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java
Modified:
hadoop/common/branches/branch-1.0/CHANGES.txt
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1243230&r1=1243229&r2=1243230&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Sun Feb 12 10:35:57 2012
@@ -24,6 +24,10 @@ Release 1.0.1 - 2012.02.12
HADOOP-8009. Create hadoop-client and hadoop-minicluster artifacts for
downstream projects. (Alejandro Abdelnur via mattf)
+ MAPREDUCE-3184. Add a thread to the TaskTracker which monitors for
+ spinning Jetty selector threads, and shuts down the daemon when one is
+ detected. (todd)
+
BUG FIXES
HADOOP-7960. Port HADOOP-5203 to branch-1, build version comparison is too
Added: hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java?rev=1243230&view=auto
==============================================================================
--- hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java (added)
+++ hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JettyBugMonitor.java Sun Feb 12 10:35:57 2012
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+
+/**
+ * Class that monitors for a certain class of Jetty bug known to
+ * affect TaskTrackers. In this type of bug, the Jetty selector
+ * thread starts spinning and using ~100% CPU while no actual
+ * HTTP content is being served. Given that this bug has been
+ * active in Jetty/JDK for a long time with no resolution in site,
+ * this class provides a temporary workaround.
+ *
+ * Upon detecting the selector thread spinning, it simply exits the
+ * JVM with a Fatal message.
+ */
+class JettyBugMonitor extends Thread {
+ private final static Log LOG = LogFactory.getLog(
+ JettyBugMonitor.class);
+
+ private static final ThreadMXBean threadBean =
+ ManagementFactory.getThreadMXBean();
+
+ private static final String CHECK_ENABLED_KEY =
+ "mapred.tasktracker.jetty.cpu.check.enabled";
+ private static final boolean CHECK_ENABLED_DEFAULT = true;
+
+ static final String CHECK_INTERVAL_KEY =
+ "mapred.tasktracker.jetty.cpu.check.interval";
+ private static final long CHECK_INTERVAL_DEFAULT = 15*1000;
+ private long checkInterval;
+
+ private static final String WARN_THRESHOLD_KEY =
+ "mapred.tasktracker.jetty.cpu.threshold.warn";
+ private static final float WARN_THRESHOLD_DEFAULT = 0.50f;
+ private float warnThreshold;
+
+ private static final String FATAL_THRESHOLD_KEY =
+ "mapred.tasktracker.jetty.cpu.threshold.fatal";
+ private static final float FATAL_THRESHOLD_DEFAULT = 0.90f;
+ private float fatalThreshold;
+
+ private boolean stopping = false;
+
+ /**
+ * Create the monitoring thread.
+ * @return null if thread CPU monitoring is not supported
+ */
+ public static JettyBugMonitor create(Configuration conf) {
+ if (!conf.getBoolean(CHECK_ENABLED_KEY, CHECK_ENABLED_DEFAULT)) {
+ return null;
+ }
+
+ if (!threadBean.isThreadCpuTimeSupported()) {
+ LOG.info("Not starting monitor for Jetty bug since thread CPU time " +
+ "measurement is not supported by this JVM");
+ return null;
+ }
+ return new JettyBugMonitor(conf);
+ }
+
+ JettyBugMonitor(Configuration conf) {
+ setName("Monitor for Jetty bugs");
+ setDaemon(true);
+
+ this.warnThreshold = conf.getFloat(
+ WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
+ this.fatalThreshold = conf.getFloat(
+ FATAL_THRESHOLD_KEY, FATAL_THRESHOLD_DEFAULT);
+ this.checkInterval = conf.getLong(
+ CHECK_INTERVAL_KEY, CHECK_INTERVAL_DEFAULT);
+ }
+
+ @Override
+ public void run() {
+ try {
+ doRun();
+ } catch (InterruptedException ie) {
+ if (!stopping) {
+ LOG.warn("Jetty monitor unexpectedly interrupted", ie);
+ }
+ } catch (Throwable t) {
+ LOG.error("Jetty bug monitor failed", t);
+ }
+ LOG.debug("JettyBugMonitor shutting down");
+ }
+
+ private void doRun() throws InterruptedException {
+ List<Long> tids = waitForJettyThreads();
+ if (tids.isEmpty()) {
+ LOG.warn("Could not locate Jetty selector threads");
+ return;
+ }
+ while (true) {
+ try {
+ monitorThreads(tids);
+ } catch (ThreadNotRunningException tnre) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Monitor the given list of threads, summing their CPU usage.
+ * If the usage exceeds the configured threshold, aborts the JVM.
+ * @param tids thread ids to monitor
+ * @throws InterruptedException if interrupted
+ * @throws ThreadNotRunningException if one of the threads is no longer
+ * running
+ */
+ private void monitorThreads(List<Long> tids)
+ throws InterruptedException, ThreadNotRunningException {
+
+ long timeBefore = System.nanoTime();
+ long usageBefore = getCpuUsageNanos(tids);
+ while (true) {
+ Thread.sleep(checkInterval);
+ long usageAfter = getCpuUsageNanos(tids);
+ long timeAfter = System.nanoTime();
+
+ long delta = usageAfter - usageBefore;
+ double percentCpu = (double)delta / (timeAfter - timeBefore);
+
+ String msg = String.format("Jetty CPU usage: %.1f%%", percentCpu * 100);
+ if (percentCpu > fatalThreshold) {
+ LOG.fatal(
+ "************************************************************\n" +
+ msg + ". This is greater than the fatal threshold " +
+ FATAL_THRESHOLD_KEY + ". Aborting JVM.\n" +
+ "************************************************************");
+ doAbort();
+ } else if (percentCpu > warnThreshold) {
+ LOG.warn(msg);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(msg);
+ }
+
+ usageBefore = usageAfter;
+ timeBefore = timeAfter;
+ }
+ }
+
+ protected void doAbort() {
+ Runtime.getRuntime().exit(1);
+ }
+
+ /**
+ * Wait for jetty selector threads to start.
+ * @return the list of thread IDs
+ * @throws InterruptedException if interrupted
+ */
+ protected List<Long> waitForJettyThreads() throws InterruptedException {
+ List<Long> tids = new ArrayList<Long>();
+ int i = 0;
+ while (tids.isEmpty() & i++ < 30) {
+ Thread.sleep(1000);
+ tids = getJettyThreadIds();
+ }
+ return tids;
+ }
+
+ private static long getCpuUsageNanos(List<Long> tids)
+ throws ThreadNotRunningException {
+ long total = 0;
+ for (long tid : tids) {
+ long time = threadBean.getThreadCpuTime(tid);
+ if (time == -1) {
+ LOG.warn("Unable to monitor CPU usage for thread: " + tid);
+ throw new ThreadNotRunningException();
+ }
+ total += time;
+ }
+ return total;
+ }
+
+ static List<Long> getJettyThreadIds() {
+ List<Long> tids = new ArrayList<Long>();
+ long[] threadIds = threadBean.getAllThreadIds();
+ for (long tid : threadIds) {
+ if (isJettySelectorThread(tid)) {
+ tids.add(tid);
+ }
+ }
+ return tids;
+ }
+
+ /**
+ * @return true if the given thread ID appears to be a Jetty selector thread
+ * based on its stack trace
+ */
+ private static boolean isJettySelectorThread(long tid) {
+ ThreadInfo info = threadBean.getThreadInfo(tid, 20);
+ for (StackTraceElement stack : info.getStackTrace()) {
+ // compare class names instead of classses, since
+ // jetty uses a different classloader
+ if (SelectChannelConnector.class.getName().equals(
+ stack.getClassName())) {
+ LOG.debug("Thread #" + tid + " (" + info.getThreadName() + ") " +
+ "is a Jetty selector thread.");
+ return true;
+ }
+ }
+ LOG.debug("Thread #" + tid + " (" + info.getThreadName() + ") " +
+ "is not a jetty thread");
+ return false;
+ }
+
+ private static class ThreadNotRunningException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public void shutdown() {
+ this.stopping = true;
+ this.interrupt();
+ }
+}
Modified: hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1243230&r1=1243229&r2=1243230&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Sun Feb 12 10:35:57 2012
@@ -382,6 +382,13 @@ public class TaskTracker implements MRCo
* Handle to the specific instance of the {@link NodeHealthCheckerService}
*/
private NodeHealthCheckerService healthChecker;
+
+ /**
+ * Thread which checks CPU usage of Jetty and shuts down the TT if it
+ * exceeds a configurable threshold.
+ */
+ private JettyBugMonitor jettyBugMonitor;
+
/**
* Configuration property for disk health check interval in milli seconds.
@@ -836,6 +843,9 @@ public class TaskTracker implements MRCo
startHealthMonitor(this.fConf);
}
+ // Start thread to monitor jetty bugs
+ startJettyBugMonitor();
+
oobHeartbeatOnTaskCompletion =
fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
oobHeartbeatDamper =
@@ -843,6 +853,13 @@ public class TaskTracker implements MRCo
DEFAULT_OOB_HEARTBEAT_DAMPER);
}
+ private void startJettyBugMonitor() {
+ jettyBugMonitor = JettyBugMonitor.create(fConf);
+ if (jettyBugMonitor != null) {
+ jettyBugMonitor.start();
+ }
+ }
+
private void createInstrumentation() {
Class<? extends TaskTrackerInstrumentation> metricsInst =
getInstrumentationClass(fConf);
@@ -1370,6 +1387,10 @@ public class TaskTracker implements MRCo
healthChecker.stop();
healthChecker = null;
}
+ if (jettyBugMonitor != null) {
+ jettyBugMonitor.shutdown();
+ jettyBugMonitor = null;
+ }
}
/**
Added: hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java?rev=1243230&view=auto
==============================================================================
--- hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java (added)
+++ hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/mapred/TestJettyBugMonitor.java Sun Feb 12 10:35:57 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer;
+import org.junit.Test;
+
+
+public class TestJettyBugMonitor {
+ private final Configuration conf = new Configuration();
+
+ /**
+ * Test that it can detect a running Jetty selector.
+ */
+ @Test(timeout=20000)
+ public void testGetJettyThreads() throws Exception {
+ JettyBugMonitor monitor = new JettyBugMonitor(conf);
+
+ new File(System.getProperty("build.webapps", "build/webapps") + "/test"
+ ).mkdirs();
+ HttpServer server = new HttpServer("test", "0.0.0.0", 0, true);
+ server.start();
+ try {
+ List<Long> threads = monitor.waitForJettyThreads();
+ assertEquals(1, threads.size());
+ } finally {
+ server.stop();
+ }
+ }
+
+ /**
+ * Test that the CPU monitoring can detect a spinning
+ * thread.
+ */
+ @Test(timeout=5000)
+ public void testMonitoring() throws Exception {
+ // Start a thread which sucks up CPU
+ BusyThread busyThread = new BusyThread();
+ busyThread.start();
+ final long tid = busyThread.getId();
+ // Latch which will be triggered when the jetty monitor
+ // wants to abort
+ final CountDownLatch abortLatch = new CountDownLatch(1);
+
+ conf.setLong(JettyBugMonitor.CHECK_INTERVAL_KEY, 1000);
+ JettyBugMonitor monitor = null;
+ try {
+ monitor = new JettyBugMonitor(conf) {
+ @Override
+ protected List<Long> waitForJettyThreads() {
+ return Collections.<Long>singletonList(tid);
+ }
+ @Override
+ protected void doAbort() {
+ abortLatch.countDown();
+ // signal abort to main thread
+ }
+ };
+ monitor.start();
+
+ abortLatch.await();
+ } finally {
+ busyThread.done = true;
+ busyThread.join();
+
+ if (monitor != null) {
+ monitor.shutdown();
+ }
+ }
+ }
+
+ private static class BusyThread extends Thread {
+ private volatile boolean done = false;
+
+ @Override
+ public void run() {
+ while (!done) {
+ // spin using up CPU
+ }
+ }
+ }
+}