You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/25 23:27:15 UTC
samza git commit: SAMZA-1220 : Add thread name to SamzaContainer
shutdown hook and prevent shutdown deadlock
Repository: samza
Updated Branches:
refs/heads/master 49cf06fc5 -> edc872272
SAMZA-1220 : Add thread name to SamzaContainer shutdown hook and prevent shutdown deadlock
* SamzaContainerExceptionHandler is written in Java and used by LocalContainerRunner.java
Author: nramesh <nr...@linkedin.com>
Reviewers: Jagadish Venkataraman <jv...@linkedin.com>, Prateek Maheshwari <pm...@linkedin.com>
Closes #139 from navina/SAMZA-1220
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/edc87227
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/edc87227
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/edc87227
Branch: refs/heads/master
Commit: edc872272cb6bb3a30a1bb8fafc7dedee333d8c2
Parents: 49cf06f
Author: Navina Ramesh <na...@apache.org>
Authored: Tue Apr 25 16:27:02 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Tue Apr 25 16:27:02 2017 -0700
----------------------------------------------------------------------
.../SamzaContainerExceptionHandler.java | 57 ++++++++++++++++++++
.../samza/runtime/LocalContainerRunner.java | 25 ++++-----
.../apache/samza/container/SamzaContainer.scala | 31 ++++++++---
.../SamzaContainerExceptionHandler.scala | 35 ------------
.../TestSamzaContainerExceptionHandler.java | 39 ++++++++++++++
.../samza/runtime/TestLocalContainerRunner.java | 49 -----------------
.../TestSamzaContainerExceptionHandler.scala | 36 -------------
7 files changed, 130 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
new file mode 100644
index 0000000..0381794
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An UncaughtExceptionHandler for SamzaContainer that simply executes the configured {@link #runnable}
+ * when any thread throws an uncaught exception.
+ */
+public class SamzaContainerExceptionHandler implements UncaughtExceptionHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SamzaContainerExceptionHandler.class);
+ private final Runnable runnable;
+
+ public SamzaContainerExceptionHandler(Runnable runnable) {
+ this.runnable = runnable;
+ }
+ /**
+ * Method invoked when the given thread terminates due to the
+ * given uncaught exception.
+ * <p>Any exception thrown by this method will be ignored by the
+ * Java Virtual Machine.
+ *
+ * @param t the thread
+ * @param e the exception
+ */
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOGGER.error(
+ String.format("Uncaught exception in thread (name=%s). Exiting process now.", t.getName()), e);
+ e.printStackTrace(System.err);
+ try {
+ runnable.run();
+ } catch (Throwable throwable) {
+ // Ignore to avoid further exception propagation
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index d8a6ecd..80350df 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -19,8 +19,6 @@
package org.apache.samza.runtime;
-import java.util.HashMap;
-import java.util.Random;
import org.apache.log4j.MDC;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
@@ -29,6 +27,7 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.container.SamzaContainerExceptionHandler;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
@@ -40,6 +39,9 @@ import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Random;
+
/**
* LocalContainerRunner is the local runner for Yarn {@link SamzaContainer}s. It is an intermediate step to
* have a local runner for yarn before we consolidate the Yarn container and coordination into a
@@ -99,11 +101,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
}
public static void main(String[] args) throws Exception {
- setExceptionHandler(() -> {
- log.info("Exiting process now.");
- System.exit(1);
- });
-
+ Thread.setDefaultUncaughtExceptionHandler(
+ new SamzaContainerExceptionHandler(() -> {
+ log.info("Exiting process now.");
+ System.exit(1);
+ }));
String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
log.info(String.format("Got container ID: %s", containerId));
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
@@ -124,13 +126,4 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
new LocalContainerRunner(jobModel, containerId).run(streamApp);
}
-
- /* package private */ static void setExceptionHandler(Runnable runnable) {
- Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
- log.error(String.format("Uncaught exception in thread (name=%s).", t.getName()), e);
- e.printStackTrace(System.err);
- runnable.run();
- };
- Thread.setDefaultUncaughtExceptionHandler(exceptionHandler);
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index a30b793..8481c92 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -628,6 +628,7 @@ class SamzaContainer(
val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L)
private val runLoopStartLatch: CountDownLatch = new CountDownLatch(1)
+ var shutdownHookThread: Thread = null
def awaitStart(timeoutMs: Long): Boolean = {
try {
@@ -665,6 +666,8 @@ class SamzaContainer(
} finally {
info("Shutting down.")
+ removeShutdownHook
+
shutdownConsumers
shutdownTask
shutdownStores
@@ -803,21 +806,37 @@ class SamzaContainer(
def addShutdownHook {
val runLoopThread = Thread.currentThread()
- Runtime.getRuntime().addShutdownHook(new Thread() {
+ shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") {
override def run() = {
info("Shutting down, will wait up to %s ms" format shutdownMs)
runLoop match {
case runLoop: RunLoop => runLoop.shutdown
case asyncRunLoop: AsyncRunLoop => asyncRunLoop.shutdown()
}
- runLoopThread.join(shutdownMs)
- if (runLoopThread.isAlive) {
- warn("Did not shut down within %s ms, exiting" format shutdownMs)
- } else {
+ try {
+ runLoopThread.join(shutdownMs)
+ } catch {
+ case e: Throwable => // Ignore to avoid deadlock with uncaughtExceptionHandler. See SAMZA-1220
+ error("Did not shut down within %s ms, exiting" format shutdownMs, e)
+ }
+ if (!runLoopThread.isAlive) {
info("Shutdown complete")
}
}
- })
+ }
+ Runtime.getRuntime().addShutdownHook(shutdownHookThread)
+ }
+
+ def removeShutdownHook = {
+ try {
+ if (shutdownHookThread != null) {
+ Runtime.getRuntime.removeShutdownHook(shutdownHookThread)
+ }
+ } catch {
+ case e: IllegalStateException => {
+ // Thrown when then JVM is already shutting down, so safe to ignore.
+ }
+ }
}
def shutdownConsumers {
http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
deleted file mode 100644
index da4c098..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container
-
-import java.lang.Thread.UncaughtExceptionHandler
-import org.apache.samza.util.Logging
-
-/**
- * An UncaughtExceptionHandler that simply shuts down when any thread throws
- * an uncaught exception.
- */
-class SamzaContainerExceptionHandler(exit: () => Unit) extends UncaughtExceptionHandler with Logging {
- def uncaughtException(t: Thread, e: Throwable) {
- error("Uncaught exception in thread (name=%s). Exiting process now.".format(t.getName), e)
- e.printStackTrace(System.err);
- exit()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
new file mode 100644
index 0000000..387bbd4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.apache.samza.SamzaException;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestSamzaContainerExceptionHandler {
+
+ @Test
+ public void testExceptionHandler() {
+ final AtomicBoolean exitCalled = new AtomicBoolean(false);
+ Thread.UncaughtExceptionHandler exceptionHandler =
+ new SamzaContainerExceptionHandler(() -> exitCalled.getAndSet(true));
+ exceptionHandler.uncaughtException(Thread.currentThread(), new SamzaException());
+ assertTrue(exitCalled.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java
deleted file mode 100644
index f9465e3..0000000
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.runtime;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class TestLocalContainerRunner {
- private boolean caughtException = false;
-
- @Test
- public void testUncaughtExceptionHandler() throws Exception {
- Runnable runnable = () -> { caughtException = true; };
- LocalContainerRunner.setExceptionHandler(runnable);
-
- try {
- ((String) null).length();
- } catch (Exception e) {
- // catch null pointer exception
- }
- assertFalse(caughtException);
-
- Thread t = new Thread(() -> {
- throw new RuntimeException("Uncaught exception in another thread. Catch this.");
- });
- t.start();
- t.join();
- assertTrue(caughtException);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
deleted file mode 100644
index b1d100c..0000000
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container
-
-import org.junit.Test
-import org.junit.Assert._
-import org.junit.Before
-import org.apache.samza.SamzaException
-import org.junit.After
-
-class TestSamzaContainerExceptionHandler {
- @Test
- def testShutdownProcess {
- var exitCalled = false
- val exceptionHandler = new SamzaContainerExceptionHandler(() => exitCalled = true)
- exceptionHandler.uncaughtException(Thread.currentThread, new SamzaException)
- assertTrue(exitCalled)
- }
-}