You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/05/30 17:37:47 UTC

samza git commit: SAMZA-1724: Guarantee exit from ApplicationRunnerMain during deploys

Repository: samza
Updated Branches:
  refs/heads/master 5534fba33 -> b9814daf0


SAMZA-1724: Guarantee exit from ApplicationRunnerMain during deploys

Author: Prateek Maheshwari <pm...@LM-LSNSCDW6508.linkedin.biz>

Reviewers: Jagadish Venkatraman <vj...@gmail.com>

Closes #538 from prateekm/process-exit


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b9814daf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b9814daf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b9814daf

Branch: refs/heads/master
Commit: b9814daf029d1e9017638936b75fff205696b973
Parents: 5534fba
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Wed May 30 10:37:23 2018 -0700
Committer: Prateek Maheshwari <pm...@LM-LSNSCDW6508.linkedin.biz>
Committed: Wed May 30 10:37:23 2018 -0700

----------------------------------------------------------------------
 .../SamzaContainerExceptionHandler.java         | 61 -----------------
 .../samza/runtime/ApplicationRunnerMain.java    |  9 +++
 .../samza/runtime/LocalContainerRunner.java     | 10 ++-
 .../util/SamzaUncaughtExceptionHandler.java     | 69 ++++++++++++++++++++
 .../TestSamzaContainerExceptionHandler.java     | 39 -----------
 .../TestSamzaUncaughtExceptionHandler.java      | 40 ++++++++++++
 6 files changed, 126 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
deleted file mode 100644
index 229f5ef..0000000
--- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An UncaughtExceptionHandler for SamzaContainer that simply executes the configured {@link #runnable}
- * when any thread throws an uncaught exception.
- */
-public class SamzaContainerExceptionHandler implements UncaughtExceptionHandler {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SamzaContainerExceptionHandler.class);
-  private final Runnable runnable;
-
-  public SamzaContainerExceptionHandler(Runnable runnable) {
-    this.runnable = runnable;
-  }
-  /**
-   * Method invoked when the given thread terminates due to the
-   * given uncaught exception.
-   * <p>Any exception thrown by this method will be ignored by the
-   * Java Virtual Machine.
-   *
-   * @param t the thread
-   * @param e the exception
-   */
-  @Override
-  public void uncaughtException(Thread t, Throwable e) {
-    String msg = String.format("Uncaught exception in thread %s.", t.getName());
-    LOGGER.error(msg, e);
-    System.err.println(msg);
-    e.printStackTrace(System.err);
-    try {
-      Util.logThreadDump("Thread dump from uncaught exception handler.");
-      runnable.run();
-    } catch (Throwable throwable) {
-      // Ignore to avoid further exception propagation
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
index 84427ea..f9f7467 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
@@ -23,6 +23,7 @@ import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.apache.samza.job.JobRunner$;
 import org.apache.samza.util.CommandLine;
 import org.apache.samza.util.Util;
@@ -52,6 +53,12 @@ public class ApplicationRunnerMain {
   }
 
   public static void main(String[] args) throws Exception {
+    Thread.setDefaultUncaughtExceptionHandler(
+        new SamzaUncaughtExceptionHandler(() -> {
+          System.out.println("Exiting process now.");
+          System.exit(1);
+        }));
+
     ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerCommandLine();
     OptionSet options = cmdLine.parser().parse(args);
     Config orgConfig = cmdLine.loadConfig(options);
@@ -78,6 +85,8 @@ public class ApplicationRunnerMain {
     } else {
       JobRunner$.MODULE$.main(args);
     }
+
+    System.exit(0);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 7751241..66176d7 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -31,7 +31,7 @@ import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.container.SamzaContainerExceptionHandler;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
@@ -78,6 +78,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
         config,
         ScalaJavaUtil.toScalaMap(new HashMap<>()),
         taskFactory);
+
     container.setContainerListener(
         new SamzaContainerListener() {
           @Override
@@ -96,9 +97,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
             containerRunnerException = t;
           }
         });
+
     startContainerHeartbeatMonitor();
     container.run();
     stopContainerHeartbeatMonitor();
+    
     if (containerRunnerException != null) {
       log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
       System.exit(1);
@@ -127,10 +130,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
 
   public static void main(String[] args) throws Exception {
     Thread.setDefaultUncaughtExceptionHandler(
-        new SamzaContainerExceptionHandler(() -> {
+        new SamzaUncaughtExceptionHandler(() -> {
           log.info("Exiting process now.");
           System.exit(1);
         }));
+
     String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
     log.info(String.format("Got container ID: %s", containerId));
     System.out.println(String.format("Container ID: %s", containerId));
@@ -153,6 +157,8 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
     StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
     LocalContainerRunner localContainerRunner = new LocalContainerRunner(jobModel, containerId);
     localContainerRunner.run(streamApp);
+
+    System.exit(0);
   }
 
   private void startContainerHeartbeatMonitor() {

http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java b/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java
new file mode 100644
index 0000000..d94b2d9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An UncaughtExceptionHandler that logs the uncaught exception, logs a thread dump, and then
+ * executes the provided {@code runnable}.
+ * <p>
+ * Example usage: Exit process if any thread throws an uncaught exception:
+ * <pre>
+ * Thread.setDefaultUncaughtExceptionHandler(
+ *   new SamzaUncaughtExceptionHandler(() -&gt; {
+ *     System.exit(1);
+ *   })
+ * );
+ * </pre>
+ */
+public class SamzaUncaughtExceptionHandler implements UncaughtExceptionHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SamzaUncaughtExceptionHandler.class);
+  private final Runnable runnable;
+
+  public SamzaUncaughtExceptionHandler(Runnable runnable) {
+    this.runnable = runnable;
+  }
+  /**
+   * Method invoked when the given thread terminates due to the
+   * given uncaught exception.
+   * <p>Any exception thrown by this method will be ignored by the
+   * Java Virtual Machine.
+   *
+   * @param t the thread
+   * @param e the exception
+   */
+  @Override
+  public void uncaughtException(Thread t, Throwable e) {
+    String msg = String.format("Uncaught exception in thread %s.", t.getName());
+    LOGGER.error(msg, e);
+    System.err.println(msg);
+    e.printStackTrace(System.err);
+    try {
+      Util.logThreadDump("Thread dump from uncaught exception handler.");
+      runnable.run();
+    } catch (Throwable throwable) {
+      // Ignore to avoid further exception propagation
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
deleted file mode 100644
index 387bbd4..0000000
--- a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container;
-
-import org.apache.samza.SamzaException;
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestSamzaContainerExceptionHandler {
-
-  @Test
-  public void testExceptionHandler() {
-    final AtomicBoolean exitCalled = new AtomicBoolean(false);
-    Thread.UncaughtExceptionHandler exceptionHandler =
-        new SamzaContainerExceptionHandler(() -> exitCalled.getAndSet(true));
-    exceptionHandler.uncaughtException(Thread.currentThread(), new SamzaException());
-    assertTrue(exitCalled.get());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java
new file mode 100644
index 0000000..d15f7da
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestSamzaUncaughtExceptionHandler {
+
+  @Test
+  public void testExceptionHandler() {
+    final AtomicBoolean exitCalled = new AtomicBoolean(false);
+    Thread.UncaughtExceptionHandler exceptionHandler =
+        new SamzaUncaughtExceptionHandler(() -> exitCalled.getAndSet(true));
+    exceptionHandler.uncaughtException(Thread.currentThread(), new SamzaException());
+    assertTrue(exitCalled.get());
+  }
+}