You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/05/30 17:37:47 UTC
samza git commit: SAMZA-1724: Guarantee exit from
ApplicationRunnerMain during deploys
Repository: samza
Updated Branches:
refs/heads/master 5534fba33 -> b9814daf0
SAMZA-1724: Guarantee exit from ApplicationRunnerMain during deploys
Author: Prateek Maheshwari <pm...@LM-LSNSCDW6508.linkedin.biz>
Reviewers: Jagadish Venkatraman <vj...@gmail.com>
Closes #538 from prateekm/process-exit
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b9814daf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b9814daf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b9814daf
Branch: refs/heads/master
Commit: b9814daf029d1e9017638936b75fff205696b973
Parents: 5534fba
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Wed May 30 10:37:23 2018 -0700
Committer: Prateek Maheshwari <pm...@LM-LSNSCDW6508.linkedin.biz>
Committed: Wed May 30 10:37:23 2018 -0700
----------------------------------------------------------------------
.../SamzaContainerExceptionHandler.java | 61 -----------------
.../samza/runtime/ApplicationRunnerMain.java | 9 +++
.../samza/runtime/LocalContainerRunner.java | 10 ++-
.../util/SamzaUncaughtExceptionHandler.java | 69 ++++++++++++++++++++
.../TestSamzaContainerExceptionHandler.java | 39 -----------
.../TestSamzaUncaughtExceptionHandler.java | 40 ++++++++++++
6 files changed, 126 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
deleted file mode 100644
index 229f5ef..0000000
--- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerExceptionHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An UncaughtExceptionHandler for SamzaContainer that simply executes the configured {@link #runnable}
- * when any thread throws an uncaught exception.
- */
-public class SamzaContainerExceptionHandler implements UncaughtExceptionHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(SamzaContainerExceptionHandler.class);
- private final Runnable runnable;
-
- public SamzaContainerExceptionHandler(Runnable runnable) {
- this.runnable = runnable;
- }
- /**
- * Method invoked when the given thread terminates due to the
- * given uncaught exception.
- * <p>Any exception thrown by this method will be ignored by the
- * Java Virtual Machine.
- *
- * @param t the thread
- * @param e the exception
- */
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- String msg = String.format("Uncaught exception in thread %s.", t.getName());
- LOGGER.error(msg, e);
- System.err.println(msg);
- e.printStackTrace(System.err);
- try {
- Util.logThreadDump("Thread dump from uncaught exception handler.");
- runnable.run();
- } catch (Throwable throwable) {
- // Ignore to avoid further exception propagation
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
index 84427ea..f9f7467 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
@@ -23,6 +23,7 @@ import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
import org.apache.samza.job.JobRunner$;
import org.apache.samza.util.CommandLine;
import org.apache.samza.util.Util;
@@ -52,6 +53,12 @@ public class ApplicationRunnerMain {
}
public static void main(String[] args) throws Exception {
+ Thread.setDefaultUncaughtExceptionHandler(
+ new SamzaUncaughtExceptionHandler(() -> {
+ System.out.println("Exiting process now.");
+ System.exit(1);
+ }));
+
ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerCommandLine();
OptionSet options = cmdLine.parser().parse(args);
Config orgConfig = cmdLine.loadConfig(options);
@@ -78,6 +85,8 @@ public class ApplicationRunnerMain {
} else {
JobRunner$.MODULE$.main(args);
}
+
+ System.exit(0);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 7751241..66176d7 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -31,7 +31,7 @@ import org.apache.samza.container.ContainerHeartbeatClient;
import org.apache.samza.container.ContainerHeartbeatMonitor;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.container.SamzaContainerExceptionHandler;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
import org.apache.samza.container.SamzaContainerListener;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.model.JobModel;
@@ -78,6 +78,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
config,
ScalaJavaUtil.toScalaMap(new HashMap<>()),
taskFactory);
+
container.setContainerListener(
new SamzaContainerListener() {
@Override
@@ -96,9 +97,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
containerRunnerException = t;
}
});
+
startContainerHeartbeatMonitor();
container.run();
stopContainerHeartbeatMonitor();
+
if (containerRunnerException != null) {
log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
System.exit(1);
@@ -127,10 +130,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
public static void main(String[] args) throws Exception {
Thread.setDefaultUncaughtExceptionHandler(
- new SamzaContainerExceptionHandler(() -> {
+ new SamzaUncaughtExceptionHandler(() -> {
log.info("Exiting process now.");
System.exit(1);
}));
+
String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
log.info(String.format("Got container ID: %s", containerId));
System.out.println(String.format("Container ID: %s", containerId));
@@ -153,6 +157,8 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
LocalContainerRunner localContainerRunner = new LocalContainerRunner(jobModel, containerId);
localContainerRunner.run(streamApp);
+
+ System.exit(0);
}
private void startContainerHeartbeatMonitor() {
http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java b/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java
new file mode 100644
index 0000000..d94b2d9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An UncaughtExceptionHandler that logs the uncaught exception, logs a thread dump, and then
+ * executes the provided {@code runnable}.
+ * <p>
+ * Example usage: Exit process if any thread throws an uncaught exception:
+ * <pre>
+ * Thread.setDefaultUncaughtExceptionHandler(
+ * new SamzaUncaughtExceptionHandler(() -> {
+ * System.exit(1);
+ * })
+ * );
+ * </pre>
+ */
+public class SamzaUncaughtExceptionHandler implements UncaughtExceptionHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SamzaUncaughtExceptionHandler.class);
+ private final Runnable runnable;
+
+ public SamzaUncaughtExceptionHandler(Runnable runnable) {
+ this.runnable = runnable;
+ }
+ /**
+ * Method invoked when the given thread terminates due to the
+ * given uncaught exception.
+ * <p>Any exception thrown by this method will be ignored by the
+ * Java Virtual Machine.
+ *
+ * @param t the thread
+ * @param e the exception
+ */
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ String msg = String.format("Uncaught exception in thread %s.", t.getName());
+ LOGGER.error(msg, e);
+ System.err.println(msg);
+ e.printStackTrace(System.err);
+ try {
+ Util.logThreadDump("Thread dump from uncaught exception handler.");
+ runnable.run();
+ } catch (Throwable throwable) {
+ // Ignore to avoid further exception propagation
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
deleted file mode 100644
index 387bbd4..0000000
--- a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerExceptionHandler.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container;
-
-import org.apache.samza.SamzaException;
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestSamzaContainerExceptionHandler {
-
- @Test
- public void testExceptionHandler() {
- final AtomicBoolean exitCalled = new AtomicBoolean(false);
- Thread.UncaughtExceptionHandler exceptionHandler =
- new SamzaContainerExceptionHandler(() -> exitCalled.getAndSet(true));
- exceptionHandler.uncaughtException(Thread.currentThread(), new SamzaException());
- assertTrue(exitCalled.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/b9814daf/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java
new file mode 100644
index 0000000..d15f7da
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestSamzaUncaughtExceptionHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestSamzaUncaughtExceptionHandler {
+
+ @Test
+ public void testExceptionHandler() {
+ final AtomicBoolean exitCalled = new AtomicBoolean(false);
+ Thread.UncaughtExceptionHandler exceptionHandler =
+ new SamzaUncaughtExceptionHandler(() -> exitCalled.getAndSet(true));
+ exceptionHandler.uncaughtException(Thread.currentThread(), new SamzaException());
+ assertTrue(exitCalled.get());
+ }
+}