You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2016/02/08 15:53:56 UTC
[1/4] hadoop git commit: Revert "HADOOP-12749. Create a
threadpoolexecutor that overrides afterExecute to log uncaught
exceptions/errors. Contributed by Sidharta Seethana."
Repository: hadoop
Updated Branches:
refs/heads/branch-2 329df98f4 -> 2451a0a8e
refs/heads/trunk 565af873d -> d37eb828f
Revert "HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to log uncaught exceptions/errors. Contributed by Sidharta Seethana."
This reverts commit f3bbe0bd020b9efe05d5918ad042d9d4d4b1ca57.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/af218101
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/af218101
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/af218101
Branch: refs/heads/trunk
Commit: af218101e50de0260ab74e5d2f96227a0541e121
Parents: 565af87
Author: Varun Vasudev <vv...@apache.org>
Authored: Mon Feb 8 20:16:44 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Mon Feb 8 20:16:44 2016 +0530
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af218101/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index d99936f..dbfa482 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -685,9 +685,6 @@ Release 2.9.0 - UNRELEASED
HADOOP-12662. The build should fail if a -Dbundle option fails (Kai Zheng
via cmccabe)
- HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to
- log uncaught exceptions/errors. (Sidharta Seethana via vvasudev)
-
BUG FIXES
HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing
[2/4] hadoop git commit: Revert "HADOOP-12749. Create a
threadpoolexecutor that overrides afterExecute to log uncaught
exceptions/errors. Contributed by Sidharta Seethana."
Posted by vv...@apache.org.
Revert "HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to log uncaught exceptions/errors. Contributed by Sidharta Seethana."
This reverts commit f3bbe0bd020b9efe05d5918ad042d9d4d4b1ca57.
(cherry picked from commit af218101e50de0260ab74e5d2f96227a0541e121)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/368a11f3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/368a11f3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/368a11f3
Branch: refs/heads/branch-2
Commit: 368a11f3dddb586899e8d7d6428fb14fc41f76f5
Parents: 329df98
Author: Varun Vasudev <vv...@apache.org>
Authored: Mon Feb 8 20:16:44 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Mon Feb 8 20:17:10 2016 +0530
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/368a11f3/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 7c3e630..6a3f773 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -35,9 +35,6 @@ Release 2.9.0 - UNRELEASED
HADOOP-12662. The build should fail if a -Dbundle option fails (Kai Zheng
via cmccabe)
- HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to
- log uncaught exceptions/errors. (Sidharta Seethana via vvasudev)
-
BUG FIXES
HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing
[3/4] hadoop git commit: HADOOP-12749. Create a threadpoolexecutor
that overrides afterExecute to log uncaught exceptions/errors. Contributed by
Sidharta Seethana.
Posted by vv...@apache.org.
HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to log uncaught exceptions/errors. Contributed by Sidharta Seethana.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d37eb828
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d37eb828
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d37eb828
Branch: refs/heads/trunk
Commit: d37eb828ffa09d55936964f555ea351b946d286e
Parents: af21810
Author: Varun Vasudev <vv...@apache.org>
Authored: Mon Feb 8 20:19:17 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Mon Feb 8 20:19:17 2016 +0530
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../hadoop/util/concurrent/ExecutorHelper.java | 67 ++++++++++++++
.../hadoop/util/concurrent/HadoopExecutors.java | 96 ++++++++++++++++++++
.../HadoopScheduledThreadPoolExecutor.java | 71 +++++++++++++++
.../concurrent/HadoopThreadPoolExecutor.java | 92 +++++++++++++++++++
.../hadoop/util/concurrent/package-info.java | 26 ++++++
6 files changed, 355 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index dbfa482..d99936f 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -685,6 +685,9 @@ Release 2.9.0 - UNRELEASED
HADOOP-12662. The build should fail if a -Dbundle option fails (Kai Zheng
via cmccabe)
+ HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to
+ log uncaught exceptions/errors. (Sidharta Seethana via vvasudev)
+
BUG FIXES
HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java
new file mode 100644
index 0000000..3bc9ed9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java
@@ -0,0 +1,67 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/** Helper functions for Executors. */
+public final class ExecutorHelper {
+
+ private static final Log LOG = LogFactory
+ .getLog(ExecutorHelper.class);
+
+ static void logThrowableFromAfterExecute(Runnable r, Throwable t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("afterExecute in thread: " + Thread.currentThread()
+ .getName() + ", runnable type: " + r.getClass().getName());
+ }
+
+ //For additional information, see: https://docs.oracle
+ // .com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor
+ // .html#afterExecute(java.lang.Runnable,%20java.lang.Throwable) .
+
+ if (t == null && r instanceof Future<?>) {
+ try {
+ ((Future<?>) r).get();
+ } catch (ExecutionException ee) {
+ LOG.warn("Execution exception when running task in " +
+ Thread.currentThread().getName());
+ t = ee.getCause();
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread (" + Thread.currentThread() + ") interrupted: ", ie);
+ Thread.currentThread().interrupt();
+ } catch (Throwable throwable) {
+ t = throwable;
+ }
+ }
+
+ if (t != null) {
+ LOG.warn("Caught exception in thread " + Thread
+ .currentThread().getName() + ": ", t);
+ }
+ }
+
+ private ExecutorHelper() {}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
new file mode 100644
index 0000000..1bc6976
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
@@ -0,0 +1,96 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+
+/** Factory methods for ExecutorService, ScheduledExecutorService instances.
+ * These executor service instances provide additional functionality (e.g
+ * logging uncaught exceptions). */
+public final class HadoopExecutors {
+ public static ExecutorService newCachedThreadPool(ThreadFactory
+ threadFactory) {
+ return new HadoopThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ threadFactory);
+ }
+
+ public static ExecutorService newFixedThreadPool(int nThreads,
+ ThreadFactory threadFactory) {
+ return new HadoopThreadPoolExecutor(nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ threadFactory);
+ }
+
+ public static ExecutorService newFixedThreadPool(int nThreads) {
+ return new HadoopThreadPoolExecutor(nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ }
+
+ //Executors.newSingleThreadExecutor has special semantics - for the
+ // moment we'll delegate to it rather than implement the semantics here.
+ public static ExecutorService newSingleThreadExecutor() {
+ return Executors.newSingleThreadExecutor();
+ }
+
+ //Executors.newSingleThreadExecutor has special semantics - for the
+ // moment we'll delegate to it rather than implement the semantics here.
+ public static ExecutorService newSingleThreadExecutor(ThreadFactory
+ threadFactory) {
+ return Executors.newSingleThreadExecutor(threadFactory);
+ }
+
+ public static ScheduledExecutorService newScheduledThreadPool(
+ int corePoolSize) {
+ return new HadoopScheduledThreadPoolExecutor(corePoolSize);
+ }
+
+ public static ScheduledExecutorService newScheduledThreadPool(
+ int corePoolSize, ThreadFactory threadFactory) {
+ return new HadoopScheduledThreadPoolExecutor(corePoolSize, threadFactory);
+ }
+
+ //Executors.newSingleThreadScheduledExecutor has special semantics - for the
+ // moment we'll delegate to it rather than implement the semantics here
+ public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
+ return Executors.newSingleThreadScheduledExecutor();
+ }
+
+ //Executors.newSingleThreadScheduledExecutor has special semantics - for the
+ // moment we'll delegate to it rather than implement the semantics here
+ public static ScheduledExecutorService newSingleThreadScheduledExecutor(
+ ThreadFactory threadFactory) {
+ return Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+
+ //disable instantiation
+ private HadoopExecutors() { }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java
new file mode 100644
index 0000000..8d910b6
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java
@@ -0,0 +1,71 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+/** An extension of ScheduledThreadPoolExecutor that provides additional
+ * functionality. */
+public class HadoopScheduledThreadPoolExecutor extends
+ ScheduledThreadPoolExecutor {
+
+ private static final Log LOG = LogFactory
+ .getLog(HadoopScheduledThreadPoolExecutor.class);
+
+ public HadoopScheduledThreadPoolExecutor(int corePoolSize) {
+ super(corePoolSize);
+ }
+
+ public HadoopScheduledThreadPoolExecutor(int corePoolSize,
+ ThreadFactory threadFactory) {
+ super(corePoolSize, threadFactory);
+ }
+
+ public HadoopScheduledThreadPoolExecutor(int corePoolSize,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, handler);
+ }
+
+ public HadoopScheduledThreadPoolExecutor(int corePoolSize,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, threadFactory, handler);
+ }
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("beforeExecute in thread: " + Thread.currentThread()
+ .getName() + ", runnable type: " + r.getClass().getName());
+ }
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ ExecutorHelper.logThrowableFromAfterExecute(r, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java
new file mode 100644
index 0000000..bcf26cb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java
@@ -0,0 +1,92 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/** An extension of ThreadPoolExecutor that provides additional functionality.
+ * */
+public final class HadoopThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private static final Log LOG = LogFactory
+ .getLog(HadoopThreadPoolExecutor.class);
+
+ public HadoopThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ }
+
+ public HadoopThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ threadFactory);
+ }
+
+ public HadoopThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ handler);
+ }
+
+ public HadoopThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ threadFactory, handler);
+ }
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("beforeExecute in thread: " + Thread.currentThread()
+ .getName() + ", runnable type: " + r.getClass().getName());
+ }
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ ExecutorHelper.logThrowableFromAfterExecute(r, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java
new file mode 100644
index 0000000..2effb65
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
[4/4] hadoop git commit: HADOOP-12749. Create a threadpoolexecutor
that overrides afterExecute to log uncaught exceptions/errors. Contributed by
Sidharta Seethana.
Posted by vv...@apache.org.
HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to log uncaught exceptions/errors. Contributed by Sidharta Seethana.
(cherry picked from commit d37eb828ffa09d55936964f555ea351b946d286e)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2451a0a8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2451a0a8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2451a0a8
Branch: refs/heads/branch-2
Commit: 2451a0a8e08f122061b2defd0ba5b832f8689e53
Parents: 368a11f
Author: Varun Vasudev <vv...@apache.org>
Authored: Mon Feb 8 20:19:17 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Mon Feb 8 20:19:51 2016 +0530
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../hadoop/util/concurrent/ExecutorHelper.java | 67 ++++++++++++++
.../hadoop/util/concurrent/HadoopExecutors.java | 96 ++++++++++++++++++++
.../HadoopScheduledThreadPoolExecutor.java | 71 +++++++++++++++
.../concurrent/HadoopThreadPoolExecutor.java | 92 +++++++++++++++++++
.../hadoop/util/concurrent/package-info.java | 26 ++++++
6 files changed, 355 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2451a0a8/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 6a3f773..7c3e630 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -35,6 +35,9 @@ Release 2.9.0 - UNRELEASED
HADOOP-12662. The build should fail if a -Dbundle option fails (Kai Zheng
via cmccabe)
+ HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to
+ log uncaught exceptions/errors. (Sidharta Seethana via vvasudev)
+
BUG FIXES
HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2451a0a8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java
new file mode 100644
index 0000000..3bc9ed9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java
@@ -0,0 +1,67 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/** Helper functions for Executors. */
+public final class ExecutorHelper {
+
+ private static final Log LOG = LogFactory
+ .getLog(ExecutorHelper.class);
+
+ static void logThrowableFromAfterExecute(Runnable r, Throwable t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("afterExecute in thread: " + Thread.currentThread()
+ .getName() + ", runnable type: " + r.getClass().getName());
+ }
+
+ //For additional information, see: https://docs.oracle
+ // .com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor
+ // .html#afterExecute(java.lang.Runnable,%20java.lang.Throwable) .
+
+ if (t == null && r instanceof Future<?>) {
+ try {
+ ((Future<?>) r).get();
+ } catch (ExecutionException ee) {
+ LOG.warn("Execution exception when running task in " +
+ Thread.currentThread().getName());
+ t = ee.getCause();
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread (" + Thread.currentThread() + ") interrupted: ", ie);
+ Thread.currentThread().interrupt();
+ } catch (Throwable throwable) {
+ t = throwable;
+ }
+ }
+
+ if (t != null) {
+ LOG.warn("Caught exception in thread " + Thread
+ .currentThread().getName() + ": ", t);
+ }
+ }
+
+ private ExecutorHelper() {}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2451a0a8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
new file mode 100644
index 0000000..1bc6976
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
@@ -0,0 +1,96 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+
+/** Factory methods for ExecutorService, ScheduledExecutorService instances.
+ * These executor service instances provide additional functionality (e.g
+ * logging uncaught exceptions). */
+public final class HadoopExecutors {
+ public static ExecutorService newCachedThreadPool(ThreadFactory
+ threadFactory) {
+ return new HadoopThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ threadFactory);
+ }
+
+ public static ExecutorService newFixedThreadPool(int nThreads,
+ ThreadFactory threadFactory) {
+ return new HadoopThreadPoolExecutor(nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ threadFactory);
+ }
+
+ public static ExecutorService newFixedThreadPool(int nThreads) {
+ return new HadoopThreadPoolExecutor(nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ }
+
+ //Executors.newSingleThreadExecutor has special semantics - for the
+ // moment we'll delegate to it rather than implement the semantics here.
+ public static ExecutorService newSingleThreadExecutor() {
+ return Executors.newSingleThreadExecutor();
+ }
+
+ //Executors.newSingleThreadExecutor has special semantics - for the
+ // moment we'll delegate to it rather than implement the semantics here.
+ public static ExecutorService newSingleThreadExecutor(ThreadFactory
+ threadFactory) {
+ return Executors.newSingleThreadExecutor(threadFactory);
+ }
+
+ public static ScheduledExecutorService newScheduledThreadPool(
+ int corePoolSize) {
+ return new HadoopScheduledThreadPoolExecutor(corePoolSize);
+ }
+
+ public static ScheduledExecutorService newScheduledThreadPool(
+ int corePoolSize, ThreadFactory threadFactory) {
+ return new HadoopScheduledThreadPoolExecutor(corePoolSize, threadFactory);
+ }
+
+ //Executors.newSingleThreadScheduledExecutor has special semantics - for the
+ // moment we'll delegate to it rather than implement the semantics here
+ public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
+ return Executors.newSingleThreadScheduledExecutor();
+ }
+
+ //Executors.newSingleThreadScheduledExecutor has special semantics - for the
+ // moment we'll delegate to it rather than implement the semantics here
+ public static ScheduledExecutorService newSingleThreadScheduledExecutor(
+ ThreadFactory threadFactory) {
+ return Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+
+ //disable instantiation
+ private HadoopExecutors() { }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2451a0a8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java
new file mode 100644
index 0000000..8d910b6
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java
@@ -0,0 +1,71 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+/** An extension of ScheduledThreadPoolExecutor that provides additional
+ * functionality. */
+public class HadoopScheduledThreadPoolExecutor extends
+ ScheduledThreadPoolExecutor {
+
+ private static final Log LOG = LogFactory
+ .getLog(HadoopScheduledThreadPoolExecutor.class);
+
+ public HadoopScheduledThreadPoolExecutor(int corePoolSize) {
+ super(corePoolSize);
+ }
+
+ public HadoopScheduledThreadPoolExecutor(int corePoolSize,
+ ThreadFactory threadFactory) {
+ super(corePoolSize, threadFactory);
+ }
+
+ public HadoopScheduledThreadPoolExecutor(int corePoolSize,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, handler);
+ }
+
+ public HadoopScheduledThreadPoolExecutor(int corePoolSize,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, threadFactory, handler);
+ }
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("beforeExecute in thread: " + Thread.currentThread()
+ .getName() + ", runnable type: " + r.getClass().getName());
+ }
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ ExecutorHelper.logThrowableFromAfterExecute(r, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2451a0a8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java
new file mode 100644
index 0000000..bcf26cb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java
@@ -0,0 +1,92 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/** An extension of ThreadPoolExecutor that provides additional functionality.
+ * */
+public final class HadoopThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private static final Log LOG = LogFactory
+ .getLog(HadoopThreadPoolExecutor.class);
+
+ public HadoopThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ }
+
+ public HadoopThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ threadFactory);
+ }
+
+ public HadoopThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ handler);
+ }
+
+ public HadoopThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ threadFactory, handler);
+ }
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("beforeExecute in thread: " + Thread.currentThread()
+ .getName() + ", runnable type: " + r.getClass().getName());
+ }
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ ExecutorHelper.logThrowableFromAfterExecute(r, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2451a0a8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java
new file mode 100644
index 0000000..2effb65
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;