You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/01/16 00:34:35 UTC

samza git commit: SAMZA-2068: Separating container launch logic into util class

Repository: samza
Updated Branches:
  refs/heads/master 61492988d -> 130c474bb


SAMZA-2068: Separating container launch logic into util class

The container launch logic needs to be invoked for beam-runner to run beam containers. This is a small refactoring of LocalContainerRunner.java.

Author: xiliu <xi...@linkedin.com>

Reviewers: Prateek M <pr...@apache.org>

Closes #881 from xinyuiscool/SAMZA-2068


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/130c474b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/130c474b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/130c474b

Branch: refs/heads/master
Commit: 130c474bb44e5641267d2df3e71b0e8093a75c8d
Parents: 6149298
Author: xiliu <xi...@linkedin.com>
Authored: Tue Jan 15 16:34:27 2019 -0800
Committer: xiliu <xi...@linkedin.com>
Committed: Tue Jan 15 16:34:27 2019 -0800

----------------------------------------------------------------------
 .../samza/runtime/ContainerLaunchUtil.java      | 179 +++++++++++++++++++
 .../samza/runtime/LocalContainerRunner.java     | 125 +------------
 2 files changed, 180 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/130c474b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
new file mode 100644
index 0000000..b483997
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.container.ContainerHeartbeatClient;
+import org.apache.samza.container.ContainerHeartbeatMonitor;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.container.SamzaContainerListener;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.context.JobContextImpl;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.task.TaskFactory;
+import org.apache.samza.task.TaskFactoryUtil;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class ContainerLaunchUtil {
+  private static final Logger log = LoggerFactory.getLogger(ContainerLaunchUtil.class);
+
+  private static volatile Throwable containerRunnerException = null;
+
+  /**
+   * This method launches a Samza container in a managed cluster, e.g. Yarn.
+   *
+   * NOTE: this util method is also invoked by Beam SamzaRunner.
+   * Any change here needs to take Beam into account.
+   */
+  public static void run(
+      ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
+      String containerId,
+      JobModel jobModel) {
+
+    Config config = jobModel.getConfig();
+    run(appDesc, containerId, jobModel, config, buildExternalContext(config));
+
+    System.exit(0);
+  }
+
+  private static void run(
+      ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
+      String containerId,
+      JobModel jobModel,
+      Config config,
+      Optional<ExternalContext> externalContextOptional) {
+    TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
+    LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
+    SamzaContainer container = SamzaContainer$.MODULE$.apply(
+        containerId,
+        jobModel,
+        ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)),
+        taskFactory,
+        JobContextImpl.fromConfigWithDefaults(config),
+        Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
+        Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
+        Option.apply(externalContextOptional.orElse(null)), localityManager);
+
+    ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
+        .createInstance(new ProcessorContext() { }, config);
+
+    container.setContainerListener(
+        new SamzaContainerListener() {
+          @Override
+          public void beforeStart() {
+            log.info("Before starting the container.");
+            listener.beforeStart();
+          }
+
+          @Override
+          public void afterStart() {
+            log.info("Container Started");
+            listener.afterStart();
+          }
+
+          @Override
+          public void afterStop() {
+            log.info("Container Stopped");
+            listener.afterStop();
+          }
+
+          @Override
+          public void afterFailure(Throwable t) {
+            log.info("Container Failed");
+            containerRunnerException = t;
+            listener.afterFailure(t);
+          }
+        });
+
+    ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
+    if (heartbeatMonitor != null) {
+      heartbeatMonitor.start();
+    }
+
+    container.run();
+    if (heartbeatMonitor != null) {
+      heartbeatMonitor.stop();
+    }
+
+    if (containerRunnerException != null) {
+      log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
+      System.exit(1);
+    }
+  }
+
+  private static Optional<ExternalContext> buildExternalContext(Config config) {
+    /*
+     * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
+     * a non-empty ExternalContext to SamzaContainer. Only config should be used to build the external context. In the
+     * future, components like the application descriptor may not be available to LocalContainerRunner.
+     */
+    return Optional.empty();
+  }
+
+  // TODO: this is going away when SAMZA-1168 is done and the initialization of metrics reporters are done via
+  // LocalApplicationRunner#createStreamProcessor()
+  private static Map<String, MetricsReporter> loadMetricsReporters(
+      ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId, Config config) {
+    Map<String, MetricsReporter> reporters = new HashMap<>();
+    appDesc.getMetricsReporterFactories().forEach((name, factory) ->
+        reporters.put(name, factory.getMetricsReporter(name, containerId, config)));
+    return reporters;
+  }
+
+  /**
+   * Creates a new container heartbeat monitor if possible.
+   * @param container the container to monitor
+   * @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
+   */
+  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
+    String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+    String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
+    if (executionEnvContainerId != null) {
+      log.info("Got execution environment container id: {}", executionEnvContainerId);
+      return new ContainerHeartbeatMonitor(() -> {
+          try {
+            container.shutdown();
+            containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat");
+          } catch (Exception e) {
+            log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
+            System.exit(1);
+          }
+        }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
+    } else {
+      log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/130c474b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 7e71b00..a3e5acf 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -27,28 +27,12 @@ import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.container.ContainerHeartbeatClient;
-import org.apache.samza.container.ContainerHeartbeatMonitor;
-import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.SamzaContainer;
-import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.container.SamzaContainerListener;
-import org.apache.samza.context.ExternalContext;
-import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.task.TaskFactory;
-import org.apache.samza.task.TaskFactoryUtil;
 import org.apache.samza.util.SamzaUncaughtExceptionHandler;
-import org.apache.samza.util.ScalaJavaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
-import scala.Option;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Random;
 
 /**
@@ -56,7 +40,6 @@ import java.util.Random;
  */
 public class LocalContainerRunner {
   private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
-  private static volatile Throwable containerRunnerException = null;
 
   public static void main(String[] args) throws Exception {
     Thread.setDefaultUncaughtExceptionHandler(
@@ -88,113 +71,7 @@ public class LocalContainerRunner {
 
     ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
         ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
-    run(appDesc, containerId, jobModel, config, buildExternalContext(config));
 
-    System.exit(0);
-  }
-
-  private static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId,
-      JobModel jobModel, Config config, Optional<ExternalContext> externalContextOptional) {
-    TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
-    LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
-    SamzaContainer container = SamzaContainer$.MODULE$.apply(
-        containerId,
-        jobModel,
-        ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)),
-        taskFactory,
-        JobContextImpl.fromConfigWithDefaults(config),
-        Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
-        Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
-        Option.apply(externalContextOptional.orElse(null)), localityManager);
-
-    ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
-        .createInstance(new ProcessorContext() { }, config);
-
-    container.setContainerListener(
-        new SamzaContainerListener() {
-          @Override
-          public void beforeStart() {
-            log.info("Before starting the container.");
-            listener.beforeStart();
-          }
-
-          @Override
-          public void afterStart() {
-            log.info("Container Started");
-            listener.afterStart();
-          }
-
-          @Override
-          public void afterStop() {
-            log.info("Container Stopped");
-            listener.afterStop();
-          }
-
-          @Override
-          public void afterFailure(Throwable t) {
-            log.info("Container Failed");
-            containerRunnerException = t;
-            listener.afterFailure(t);
-          }
-        });
-
-    ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
-    if (heartbeatMonitor != null) {
-      heartbeatMonitor.start();
-    }
-
-    container.run();
-    if (heartbeatMonitor != null) {
-      heartbeatMonitor.stop();
-    }
-
-    if (containerRunnerException != null) {
-      log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
-      System.exit(1);
-    }
-  }
-
-  private static Optional<ExternalContext> buildExternalContext(Config config) {
-    /*
-     * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
-     * a non-empty ExternalContext to SamzaContainer. Only config should be used to build the external context. In the
-     * future, components like the application descriptor may not be available to LocalContainerRunner.
-     */
-    return Optional.empty();
-  }
-
-  // TODO: this is going away when SAMZA-1168 is done and the initialization of metrics reporters are done via
-  // LocalApplicationRunner#createStreamProcessor()
-  private static Map<String, MetricsReporter> loadMetricsReporters(
-      ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId, Config config) {
-    Map<String, MetricsReporter> reporters = new HashMap<>();
-    appDesc.getMetricsReporterFactories().forEach((name, factory) ->
-        reporters.put(name, factory.getMetricsReporter(name, containerId, config)));
-    return reporters;
-  }
-
-  /**
-   * Creates a new container heartbeat monitor if possible.
-   * @param container the container to monitor
-   * @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
-   */
-  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
-    String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
-    String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
-    if (executionEnvContainerId != null) {
-      log.info("Got execution environment container id: {}", executionEnvContainerId);
-      return new ContainerHeartbeatMonitor(() -> {
-          try {
-            container.shutdown();
-            containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat");
-          } catch (Exception e) {
-            log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
-            System.exit(1);
-          }
-        }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
-    } else {
-      log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
-      return null;
-    }
+    ContainerLaunchUtil.run(appDesc, containerId, jobModel);
   }
 }