You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/01/16 00:34:35 UTC
samza git commit: SAMZA-2068: Separating container launch logic into
util class
Repository: samza
Updated Branches:
refs/heads/master 61492988d -> 130c474bb
SAMZA-2068: Separating container launch logic into util class
The container launch logic needs to be invoked for beam-runner to run beam containers. This is a small refactoring of LocalContainerRunner.java.
Author: xiliu <xi...@linkedin.com>
Reviewers: Prateek M <pr...@apache.org>
Closes #881 from xinyuiscool/SAMZA-2068
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/130c474b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/130c474b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/130c474b
Branch: refs/heads/master
Commit: 130c474bb44e5641267d2df3e71b0e8093a75c8d
Parents: 6149298
Author: xiliu <xi...@linkedin.com>
Authored: Tue Jan 15 16:34:27 2019 -0800
Committer: xiliu <xi...@linkedin.com>
Committed: Tue Jan 15 16:34:27 2019 -0800
----------------------------------------------------------------------
.../samza/runtime/ContainerLaunchUtil.java | 179 +++++++++++++++++++
.../samza/runtime/LocalContainerRunner.java | 125 +------------
2 files changed, 180 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/130c474b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
new file mode 100644
index 0000000..b483997
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.container.ContainerHeartbeatClient;
+import org.apache.samza.container.ContainerHeartbeatMonitor;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.container.SamzaContainerListener;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.context.JobContextImpl;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.task.TaskFactory;
+import org.apache.samza.task.TaskFactoryUtil;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class ContainerLaunchUtil {
+ private static final Logger log = LoggerFactory.getLogger(ContainerLaunchUtil.class);
+
+ private static volatile Throwable containerRunnerException = null;
+
+ /**
+ * This method launches a Samza container in a managed cluster, e.g. Yarn.
+ *
+ * NOTE: this util method is also invoked by Beam SamzaRunner.
+ * Any change here needs to take Beam into account.
+ */
+ public static void run(
+ ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
+ String containerId,
+ JobModel jobModel) {
+
+ Config config = jobModel.getConfig();
+ run(appDesc, containerId, jobModel, config, buildExternalContext(config));
+
+ System.exit(0);
+ }
+
+ private static void run(
+ ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
+ String containerId,
+ JobModel jobModel,
+ Config config,
+ Optional<ExternalContext> externalContextOptional) {
+ TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
+ LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
+ SamzaContainer container = SamzaContainer$.MODULE$.apply(
+ containerId,
+ jobModel,
+ ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)),
+ taskFactory,
+ JobContextImpl.fromConfigWithDefaults(config),
+ Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
+ Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
+ Option.apply(externalContextOptional.orElse(null)), localityManager);
+
+ ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
+ .createInstance(new ProcessorContext() { }, config);
+
+ container.setContainerListener(
+ new SamzaContainerListener() {
+ @Override
+ public void beforeStart() {
+ log.info("Before starting the container.");
+ listener.beforeStart();
+ }
+
+ @Override
+ public void afterStart() {
+ log.info("Container Started");
+ listener.afterStart();
+ }
+
+ @Override
+ public void afterStop() {
+ log.info("Container Stopped");
+ listener.afterStop();
+ }
+
+ @Override
+ public void afterFailure(Throwable t) {
+ log.info("Container Failed");
+ containerRunnerException = t;
+ listener.afterFailure(t);
+ }
+ });
+
+ ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
+ if (heartbeatMonitor != null) {
+ heartbeatMonitor.start();
+ }
+
+ container.run();
+ if (heartbeatMonitor != null) {
+ heartbeatMonitor.stop();
+ }
+
+ if (containerRunnerException != null) {
+ log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
+ System.exit(1);
+ }
+ }
+
+ private static Optional<ExternalContext> buildExternalContext(Config config) {
+ /*
+ * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
+ * a non-empty ExternalContext to SamzaContainer. Only config should be used to build the external context. In the
+ * future, components like the application descriptor may not be available to LocalContainerRunner.
+ */
+ return Optional.empty();
+ }
+
+ // TODO: this is going away when SAMZA-1168 is done and the initialization of metrics reporters are done via
+ // LocalApplicationRunner#createStreamProcessor()
+ private static Map<String, MetricsReporter> loadMetricsReporters(
+ ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId, Config config) {
+ Map<String, MetricsReporter> reporters = new HashMap<>();
+ appDesc.getMetricsReporterFactories().forEach((name, factory) ->
+ reporters.put(name, factory.getMetricsReporter(name, containerId, config)));
+ return reporters;
+ }
+
+ /**
+ * Creates a new container heartbeat monitor if possible.
+ * @param container the container to monitor
+ * @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
+ */
+ private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
+ String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+ String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
+ if (executionEnvContainerId != null) {
+ log.info("Got execution environment container id: {}", executionEnvContainerId);
+ return new ContainerHeartbeatMonitor(() -> {
+ try {
+ container.shutdown();
+ containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat");
+ } catch (Exception e) {
+ log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
+ System.exit(1);
+ }
+ }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
+ } else {
+ log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/130c474b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 7e71b00..a3e5acf 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -27,28 +27,12 @@ import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.container.ContainerHeartbeatClient;
-import org.apache.samza.container.ContainerHeartbeatMonitor;
-import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.SamzaContainer;
-import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.container.SamzaContainerListener;
-import org.apache.samza.context.ExternalContext;
-import org.apache.samza.context.JobContextImpl;
import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.task.TaskFactory;
-import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.util.SamzaUncaughtExceptionHandler;
-import org.apache.samza.util.ScalaJavaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import scala.Option;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
import java.util.Random;
/**
@@ -56,7 +40,6 @@ import java.util.Random;
*/
public class LocalContainerRunner {
private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
- private static volatile Throwable containerRunnerException = null;
public static void main(String[] args) throws Exception {
Thread.setDefaultUncaughtExceptionHandler(
@@ -88,113 +71,7 @@ public class LocalContainerRunner {
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
- run(appDesc, containerId, jobModel, config, buildExternalContext(config));
- System.exit(0);
- }
-
- private static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId,
- JobModel jobModel, Config config, Optional<ExternalContext> externalContextOptional) {
- TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
- LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
- SamzaContainer container = SamzaContainer$.MODULE$.apply(
- containerId,
- jobModel,
- ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)),
- taskFactory,
- JobContextImpl.fromConfigWithDefaults(config),
- Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
- Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
- Option.apply(externalContextOptional.orElse(null)), localityManager);
-
- ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
- .createInstance(new ProcessorContext() { }, config);
-
- container.setContainerListener(
- new SamzaContainerListener() {
- @Override
- public void beforeStart() {
- log.info("Before starting the container.");
- listener.beforeStart();
- }
-
- @Override
- public void afterStart() {
- log.info("Container Started");
- listener.afterStart();
- }
-
- @Override
- public void afterStop() {
- log.info("Container Stopped");
- listener.afterStop();
- }
-
- @Override
- public void afterFailure(Throwable t) {
- log.info("Container Failed");
- containerRunnerException = t;
- listener.afterFailure(t);
- }
- });
-
- ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
- if (heartbeatMonitor != null) {
- heartbeatMonitor.start();
- }
-
- container.run();
- if (heartbeatMonitor != null) {
- heartbeatMonitor.stop();
- }
-
- if (containerRunnerException != null) {
- log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
- System.exit(1);
- }
- }
-
- private static Optional<ExternalContext> buildExternalContext(Config config) {
- /*
- * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
- * a non-empty ExternalContext to SamzaContainer. Only config should be used to build the external context. In the
- * future, components like the application descriptor may not be available to LocalContainerRunner.
- */
- return Optional.empty();
- }
-
- // TODO: this is going away when SAMZA-1168 is done and the initialization of metrics reporters are done via
- // LocalApplicationRunner#createStreamProcessor()
- private static Map<String, MetricsReporter> loadMetricsReporters(
- ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId, Config config) {
- Map<String, MetricsReporter> reporters = new HashMap<>();
- appDesc.getMetricsReporterFactories().forEach((name, factory) ->
- reporters.put(name, factory.getMetricsReporter(name, containerId, config)));
- return reporters;
- }
-
- /**
- * Creates a new container heartbeat monitor if possible.
- * @param container the container to monitor
- * @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
- */
- private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
- String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
- String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
- if (executionEnvContainerId != null) {
- log.info("Got execution environment container id: {}", executionEnvContainerId);
- return new ContainerHeartbeatMonitor(() -> {
- try {
- container.shutdown();
- containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat");
- } catch (Exception e) {
- log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
- System.exit(1);
- }
- }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
- } else {
- log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
- return null;
- }
+ ContainerLaunchUtil.run(appDesc, containerId, jobModel);
}
}