You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/25 23:27:15 UTC

samza git commit: SAMZA-1220 : Add thread name to SamzaContainer shutdown hook and prevent shutdown deadlock

Repository: samza
Updated Branches:
  refs/heads/master 49cf06fc5 -> edc872272


SAMZA-1220 : Add thread name to SamzaContainer shutdown hook and prevent shutdown deadlock

* SamzaContainerExceptionHandler is written in Java and used by LocalContainerRunner.java

Author: nramesh <nr...@linkedin.com>

Reviewers: Jagadish Venkataraman <jv...@linkedin.com>, Prateek Maheshwari <pm...@linkedin.com>

Closes #139 from navina/SAMZA-1220


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/edc87227
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/edc87227
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/edc87227

Branch: refs/heads/master
Commit: edc872272cb6bb3a30a1bb8fafc7dedee333d8c2
Parents: 49cf06f
Author: Navina Ramesh <na...@apache.org>
Authored: Tue Apr 25 16:27:02 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Tue Apr 25 16:27:02 2017 -0700

----------------------------------------------------------------------
 .../SamzaContainerExceptionHandler.java         | 57 ++++++++++++++++++++
 .../samza/runtime/LocalContainerRunner.java     | 25 ++++-----
 .../apache/samza/container/SamzaContainer.scala | 31 ++++++++---
 .../SamzaContainerExceptionHandler.scala        | 35 ------------
 .../TestSamzaContainerExceptionHandler.java     | 39 ++++++++++++++
 .../samza/runtime/TestLocalContainerRunner.java | 49 -----------------
 .../TestSamzaContainerExceptionHandler.scala    | 36 -------------
 7 files changed, 130 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
new file mode 100644
index 0000000..0381794
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An UncaughtExceptionHandler for SamzaContainer that simply executes the configured {@link #runnable}
+ * when any thread throws an uncaught exception.
+ */
+public class SamzaContainerExceptionHandler implements UncaughtExceptionHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SamzaContainerExceptionHandler.class);
+  private final Runnable runnable;
+
+  public SamzaContainerExceptionHandler(Runnable runnable) {
+    this.runnable = runnable;
+  }
+  /**
+   * Method invoked when the given thread terminates due to the
+   * given uncaught exception.
+   * <p>Any exception thrown by this method will be ignored by the
+   * Java Virtual Machine.
+   *
+   * @param t the thread
+   * @param e the exception
+   */
+  @Override
+  public void uncaughtException(Thread t, Throwable e) {
+    LOGGER.error(
+        String.format("Uncaught exception in thread (name=%s). Exiting process now.", t.getName()), e);
+    e.printStackTrace(System.err);
+    try {
+      runnable.run();
+    } catch (Throwable throwable) {
+      // Ignore to avoid further exception propagation
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index d8a6ecd..80350df 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -19,8 +19,6 @@
 
 package org.apache.samza.runtime;
 
-import java.util.HashMap;
-import java.util.Random;
 import org.apache.log4j.MDC;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
@@ -29,6 +27,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.container.SamzaContainerExceptionHandler;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -40,6 +39,9 @@ import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Random;
+
 /**
  * LocalContainerRunner is the local runner for Yarn {@link SamzaContainer}s. It is an intermediate step to
  * have a local runner for yarn before we consolidate the Yarn container and coordination into a
@@ -99,11 +101,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
   }
 
   public static void main(String[] args) throws Exception {
-    setExceptionHandler(() -> {
-        log.info("Exiting process now.");
-        System.exit(1);
-      });
-
+    Thread.setDefaultUncaughtExceptionHandler(
+        new SamzaContainerExceptionHandler(() -> {
+          log.info("Exiting process now.");
+          System.exit(1);
+        }));
     String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
     log.info(String.format("Got container ID: %s", containerId));
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
@@ -124,13 +126,4 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
     StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
     new LocalContainerRunner(jobModel, containerId).run(streamApp);
   }
-
-  /* package private */ static void setExceptionHandler(Runnable runnable) {
-    Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
-      log.error(String.format("Uncaught exception in thread (name=%s).", t.getName()), e);
-      e.printStackTrace(System.err);
-      runnable.run();
-    };
-    Thread.setDefaultUncaughtExceptionHandler(exceptionHandler);
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index a30b793..8481c92 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -628,6 +628,7 @@ class SamzaContainer(
 
   val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L)
   private val runLoopStartLatch: CountDownLatch = new CountDownLatch(1)
+  var shutdownHookThread: Thread = null
 
   def awaitStart(timeoutMs: Long): Boolean = {
     try {
@@ -665,6 +666,8 @@ class SamzaContainer(
     } finally {
       info("Shutting down.")
 
+      removeShutdownHook
+
       shutdownConsumers
       shutdownTask
       shutdownStores
@@ -803,21 +806,37 @@ class SamzaContainer(
 
   def addShutdownHook {
     val runLoopThread = Thread.currentThread()
-    Runtime.getRuntime().addShutdownHook(new Thread() {
+    shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") {
       override def run() = {
         info("Shutting down, will wait up to %s ms" format shutdownMs)
         runLoop match {
           case runLoop: RunLoop => runLoop.shutdown
           case asyncRunLoop: AsyncRunLoop => asyncRunLoop.shutdown()
         }
-        runLoopThread.join(shutdownMs)
-        if (runLoopThread.isAlive) {
-          warn("Did not shut down within %s ms, exiting" format shutdownMs)
-        } else {
+        try {
+          runLoopThread.join(shutdownMs)
+        } catch {
+          case e: Throwable => // Ignore to avoid deadlock with uncaughtExceptionHandler. See SAMZA-1220
+            error("Did not shut down within %s ms, exiting" format shutdownMs, e)
+        }
+        if (!runLoopThread.isAlive) {
           info("Shutdown complete")
         }
       }
-    })
+    }
+    Runtime.getRuntime().addShutdownHook(shutdownHookThread)
+  }
+
+  def removeShutdownHook = {
+    try {
+      if (shutdownHookThread != null) {
+        Runtime.getRuntime.removeShutdownHook(shutdownHookThread)
+      }
+    } catch {
+      case e: IllegalStateException => {
+        // Thrown when then JVM is already shutting down, so safe to ignore.
+      }
+    }
   }
 
   def shutdownConsumers {

http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
deleted file mode 100644
index da4c098..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container
-
-import java.lang.Thread.UncaughtExceptionHandler
-import org.apache.samza.util.Logging
-
-/**
- * An UncaughtExceptionHandler that simply shuts down when any thread throws
- * an uncaught exception.
- */
-class SamzaContainerExceptionHandler(exit: () => Unit) extends UncaughtExceptionHandler with Logging {
-  def uncaughtException(t: Thread, e: Throwable) {
-    error("Uncaught exception in thread (name=%s). Exiting process now.".format(t.getName), e)
-    e.printStackTrace(System.err);
-    exit()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
new file mode 100644
index 0000000..387bbd4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.apache.samza.SamzaException;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestSamzaContainerExceptionHandler {
+
+  @Test
+  public void testExceptionHandler() {
+    final AtomicBoolean exitCalled = new AtomicBoolean(false);
+    Thread.UncaughtExceptionHandler exceptionHandler =
+        new SamzaContainerExceptionHandler(() -> exitCalled.getAndSet(true));
+    exceptionHandler.uncaughtException(Thread.currentThread(), new SamzaException());
+    assertTrue(exitCalled.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java
deleted file mode 100644
index f9465e3..0000000
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.runtime;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class TestLocalContainerRunner {
-  private boolean caughtException = false;
-
-  @Test
-  public void testUncaughtExceptionHandler() throws Exception {
-    Runnable runnable = () -> { caughtException = true; };
-    LocalContainerRunner.setExceptionHandler(runnable);
-
-    try {
-      ((String) null).length();
-    } catch (Exception e) {
-      // catch null pointer exception
-    }
-    assertFalse(caughtException);
-
-    Thread t = new Thread(() -> {
-        throw new RuntimeException("Uncaught exception in another thread. Catch this.");
-      });
-    t.start();
-    t.join();
-    assertTrue(caughtException);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/edc87227/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
deleted file mode 100644
index b1d100c..0000000
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container
-
-import org.junit.Test
-import org.junit.Assert._
-import org.junit.Before
-import org.apache.samza.SamzaException
-import org.junit.After
-
-class TestSamzaContainerExceptionHandler {
-  @Test
-  def testShutdownProcess {
-    var exitCalled = false
-    val exceptionHandler = new SamzaContainerExceptionHandler(() => exitCalled = true)
-    exceptionHandler.uncaughtException(Thread.currentThread, new SamzaException)
-    assertTrue(exitCalled)
-  }
-}