You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2019/10/14 18:36:18 UTC

[samza] branch master updated: SAMZA-2343: Remove original split deployment code (#1182)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 5660b86  SAMZA-2343: Remove original split deployment code (#1182)
5660b86 is described below

commit 5660b8682a0c4ff1556f7f91db2363ee5170b7af
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Mon Oct 14 11:35:56 2019 -0700

    SAMZA-2343: Remove original split deployment code (#1182)
---
 .../versioned/jobs/split-deployment.md             | 59 ----------------------
 .../java/org/apache/samza/config/JobConfig.java    | 19 -------
 .../apache/samza/job/local/ProcessJobFactory.scala |  6 ---
 .../TestContainerProcessManager.java               | 25 ---------
 .../org/apache/samza/config/TestJobConfig.java     | 17 -------
 .../apache/samza/job/TestShellCommandBuilder.scala |  6 +--
 samza-shell/src/main/bash/run-class.sh             | 42 +++------------
 .../samza/job/yarn/YarnClusterResourceManager.java | 29 +++--------
 .../scala/org/apache/samza/job/yarn/YarnJob.scala  | 23 +--------
 9 files changed, 17 insertions(+), 209 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/split-deployment.md b/docs/learn/documentation/versioned/jobs/split-deployment.md
deleted file mode 100644
index fa3e7ae..0000000
--- a/docs/learn/documentation/versioned/jobs/split-deployment.md
+++ /dev/null
@@ -1,59 +0,0 @@
----
-layout: page
-title: Separating Samza Framework and Jobs Deployment
----
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-
-### Motivation
-Currently all Samza jobs are deployed as a single unit/package which combines all the Samza libraries, user code and configs together. Typically in a large organization the team that manages the Samza cluster is not the same as the teams that are running applications on top of Samza. In this case, the current way of deployment presents two major problems:
-
-* **Samza software releases**:
-Every time Samza team releases a new version (for example a bug fix), the only way to deploy it is to rebuild all users packages and redeploy them. It would be much more efficient if the team could release the Samza framework separately, at its own cadence, and a simple job restart would pick up the new version.
-
-* **Packages incompatibilities**:
-If both Samza and a job depend on the same software, but on different (especially backward incompatible) versions, they cannot be released together, because it will most likely cause some runtime issue. Ideally, each one of them would load the packages it needs separately.
-<b>NOTE.</b>This problem is not addressed here.
-
-To address the first problem, we separate the deployment of the Samza framework from user jobs by defining two deployable units:
-
-* **Samza framework** - This contains Samza libraries only, and is deployed separately to all the machines in a cluster.
-* **User's job** - This contains user code only, and uses the pre-deployed Samza framework to run.
-
-Split deployment allows upgrading the Samza framework without forcing developers to explicitly upgrade their running applications. It also allows different versions of Samza framework with simple config changes. This means we can support canary, upgrade and rollback scenarios commonly
-required in organizations that run tens or hundreds of jobs.
-
-### Deployment sequence
-
-#### Pre-requisite for split deployment
-Each deployment will now consist of two separate packages:<p>
-
-1. **Samza framework** - This includes all Samza libraries and scripts, such as samza-api, samza-core, samza-log4j, samza-kafka, samza-yarn, samza-kv, samza-kv-inmemory, samza-kv-rocksdb, samza-shell, samza-hdfs and all their dependencies.
-2. **User's job** - This includes the job package: all user code for the StreamTask implementation, configs, and other libraries required by the job. The job's package should depend only samza-api and no other Samza libraries. The package won't be able to start by itself. In order to start, it will need to use the Samza framework.
-
-#### Deployment steps
-To run a job in split deployment mode:
-
-1. **Deploy the framework**:
-The Samza framework package should be deployed to ALL the machines of a cluster into a predefined, fixed location. This could be done by merely copying the jars, or creating a meta package that would deploy all of them. Let's assume that 'samza-framework' package is installed into the '/.../samza-fwk/0.12.0' directory.
-
-2. **Create symbolic link**:
-A symbolic link needs to be created for the **stable** version of the framework to point to the framework location, e.g.: {% highlight bash %} ln -s /.../samza-fwk/0.12.0 /.../samza-fwk/STABLE' {% endhighlight %}
-
-3. **Deploy user job**:
-In the job's config, the following property is required to enable split deployment, e.g. for Samza framework path at '/.../samza-fwk': {% highlight jproperties %} samza.fwk.path=/.../samza-fwk {% endhighlight %} By default Samza will look for the **stable** link inside the folder to find the framework. You can also override the version by configuring: {% highlight jproperties %} samza.fwk.version=0.11.1 {% endhighlight %} In this case Samza will pick '/.../samza-fwk/0.11.1' as the framew [...]
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index efff572..581aad4 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -51,9 +51,6 @@ public class JobConfig extends MapConfig {
   public static final String JOB_ID = "job.id";
   static final String DEFAULT_JOB_ID = "1";
 
-  public static final String SAMZA_FWK_PATH = "samza.fwk.path";
-  public static final String SAMZA_FWK_VERSION = "samza.fwk.version";
-
   public static final String JOB_COORDINATOR_SYSTEM = "job.coordinator.system";
   public static final String JOB_DEFAULT_SYSTEM = "job.default.system";
 
@@ -315,22 +312,6 @@ public class JobConfig extends MapConfig {
   }
 
   /**
-   * Reads the config to figure out if split deployment is enabled and fwk directory is setup
-   * @return fwk + "/" + version, or empty string if fwk path is not specified
-   */
-  public static String getFwkPath(Config config) {
-    String fwkPath = config.get(SAMZA_FWK_PATH, "");
-    String fwkVersion = config.get(SAMZA_FWK_VERSION);
-    if (fwkVersion == null || fwkVersion.isEmpty()) {
-      fwkVersion = "STABLE";
-    }
-    if (!fwkPath.isEmpty()) {
-      fwkPath = fwkPath + File.separator + fwkVersion;
-    }
-    return fwkPath;
-  }
-
-  /**
    * The metadata file is written in a {@code exec-env-container-id}.metadata file in the log-dir of the container.
    * Here the {@code exec-env-container-id} refers to the ID assigned by the cluster manager (e.g., YARN) to the container,
    * which uniquely identifies a container's lifecycle.
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index b1d3215..36f1457 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -81,11 +81,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
       startpointManager.stop()
     }
 
-    val containerModel = coordinator.jobModel.getContainers.get(0)
-
-    val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is configured
-    info("Process job. using fwkPath = " + fwkPath)
-
     val taskConfig = new TaskConfig(config)
     val commandBuilderClass = taskConfig.getCommandClass(classOf[ShellCommandBuilder].getName)
     info("Using command builder class %s" format commandBuilderClass)
@@ -98,7 +93,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
       .setConfig(config)
       .setId("0")
       .setUrl(coordinator.server.getUrl)
-      .setCommandPath(fwkPath)
 
     new ProcessJob(commandBuilder, coordinator)
   }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 827bddf..1ee68aa 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.JobModelManager;
@@ -860,30 +859,6 @@ public class TestContainerProcessManager {
     cpm.stop();
   }
 
-  @Test
-  public void testAppMasterWithFwk() {
-    Config conf = getConfig();
-    SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
-    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
-    MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-
-
-    ContainerProcessManager cpm =
-        buildContainerProcessManager(new ClusterManagerConfig(conf), state, clusterResourceManager, Optional.empty());
-    cpm.start();
-    SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
-    assertFalse(cpm.shouldShutdown());
-    cpm.onResourceAllocated(container2);
-
-    configVals.put(JobConfig.SAMZA_FWK_PATH, "/export/content/whatever");
-    Config config1 = new MapConfig(configVals);
-
-    ContainerProcessManager cpm1 =
-        buildContainerProcessManager(new ClusterManagerConfig(config), state, clusterResourceManager, Optional.empty());
-    cpm1.start();
-    cpm1.onResourceAllocated(container2);
-  }
-
   @After
   public void teardown() {
     server.stop();
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index dab0f77..2e19773 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -548,23 +548,6 @@ public class TestJobConfig {
   }
 
   @Test
-  public void testGetFwkPath() {
-    String samzaFwkPath = "/path/to/samza/fwk", samzaFwkVersion = "1.2.3";
-
-    // has path and version
-    Config config = new MapConfig(
-        ImmutableMap.of(JobConfig.SAMZA_FWK_PATH, samzaFwkPath, JobConfig.SAMZA_FWK_VERSION, samzaFwkVersion));
-    assertEquals(samzaFwkPath + File.separator + samzaFwkVersion, JobConfig.getFwkPath(config));
-
-    // only has path; use STABLE for version
-    config = new MapConfig(ImmutableMap.of(JobConfig.SAMZA_FWK_PATH, samzaFwkPath));
-    assertEquals(samzaFwkPath + File.separator + "STABLE", JobConfig.getFwkPath(config));
-
-    // no path; return empty string
-    assertTrue(JobConfig.getFwkPath(new MapConfig()).isEmpty());
-  }
-
-  @Test
   public void testGetMetadataFile() {
     String execEnvContainerId = "container-id";
     String containerMetadataDirectory = "/tmp/samza/log/dir";
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
index 4df53fd..c70af8d 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
@@ -44,7 +44,7 @@ class TestShellCommandBuilder {
 
   // if cmdPath is specified, the full path to the command should be adjusted
   @Test
-  def testCommandWithFwkPath {
+  def testBuildCommandWithCommandPath {
     val urlStr = "http://www.linkedin.com"
     val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
     val scb = new ShellCommandBuilder
@@ -54,8 +54,8 @@ class TestShellCommandBuilder {
     val command = scb.buildCommand
     assertEquals("foo", command)
 
-    scb.setCommandPath("/fwk/path")
+    scb.setCommandPath("/package/path")
     val command1 = scb.buildCommand
-    assertEquals("/fwk/path/foo", command1)
+    assertEquals("/package/path/foo", command1)
   }
 }
\ No newline at end of file
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index cd50df3..5b3b7d1 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -42,44 +42,16 @@ GC_LOG_ROTATION_OPTS="-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GC
 DEFAULT_LOG4J_FILE=$base_dir/lib/log4j.xml
 DEFAULT_LOG4J2_FILE=$base_dir/lib/log4j2.xml
 BASE_LIB_DIR="$base_dir/lib"
-# JOB_LIB_DIR will be set for yarn container in ContainerUtil.java
-# for others we set it to home_dir/lib
-JOB_LIB_DIR="${JOB_LIB_DIR:-$base_dir/lib}"
 
-export JOB_LIB_DIR=$JOB_LIB_DIR
-
-echo JOB_LIB_DIR=$JOB_LIB_DIR
 echo BASE_LIB_DIR=$BASE_LIB_DIR
+
 CLASSPATH=""
-if [ -d "$JOB_LIB_DIR" ] && [ "$JOB_LIB_DIR" != "$BASE_LIB_DIR" ]; then
-  # build a common classpath
-  # this class path will contain all the jars from the framework and the job's libs.
-  # in case of different version of the same lib - we pick the highest
-
-  #all jars from the fwk
-  base_jars=`ls $BASE_LIB_DIR/*.[jw]ar`
-  #all jars from the job
-  job_jars=`for file in $JOB_LIB_DIR/*.[jw]ar; do name=\`basename $file\`; if [[ $base_jars != *"$name"* ]]; then echo "$file"; fi; done`
-  # get all lib jars and reverse sort it by versions
-  all_jars=`for file in $base_jars $job_jars; do echo \`basename $file|sed 's/.*[-]\([0-9]\+\..*\)[jw]ar$/\1/'\` $file; done|sort -t. -k 1,1nr -k 2,2nr -k 3,3nr -k 4,4nr|awk '{print $2}'`
-  # generate the class path based on the sorted result, all the jars need to be appended on newlines
-  # to ensure java argument length of 72 bytes is not violated
-  for jar in $all_jars; do CLASSPATH=$CLASSPATH" $jar \n"; done
-
-  # for debug only
-  echo base_jars=$base_jars
-  echo job_jars=$job_jars
-  echo all_jars=$all_jars
-  echo generated combined CLASSPATH=$CLASSPATH
-else
-  # default behavior, all the jars need to be appended on newlines
-  # to ensure line argument length of 72 bytes is not violated
-  for file in $BASE_LIB_DIR/*.[jw]ar;
-  do
-    CLASSPATH=$CLASSPATH" $file \n"
-  done
-  echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH
-fi
+# all the jars need to be appended on newlines to ensure line argument length of 72 bytes is not violated
+for file in $BASE_LIB_DIR/*.[jw]ar;
+do
+  CLASSPATH=$CLASSPATH" $file \n"
+done
+echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH
 
 # In some cases (AWS) $JAVA_HOME/bin doesn't contain jar.
 if [ -z "$JAVA_HOME" ] || [ ! -e "$JAVA_HOME/bin/jar" ]; then
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index eade44d..14cb68f 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -42,7 +42,6 @@ import org.apache.samza.clustermanager.SamzaApplicationState;
 import org.apache.samza.clustermanager.ProcessorLaunchException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.YarnConfig;
 import org.apache.samza.coordinator.JobModelManager;
@@ -585,15 +584,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
    */
   public void runProcessor(String processorId, Container container, CommandBuilder cmdBuilder) throws IOException {
     String containerIdStr = ConverterUtils.toString(container.getId());
-    // check if we have framework path specified. If yes - use it, if not use default ./__package/
-    String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries
     String cmdPath = "./__package/";
-
-    String fwkPath = JobConfig.getFwkPath(this.config);
-    if(fwkPath != null && (! fwkPath.isEmpty())) {
-      cmdPath = fwkPath;
-      jobLib = "export JOB_LIB_DIR=./__package/lib";
-    }
     cmdBuilder.setCommandPath(cmdPath);
     String command = cmdBuilder.buildCommand();
 
@@ -601,8 +592,9 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
 
     Path packagePath = new Path(yarnConfig.getPackagePath());
-    String formattedCommand = getFormattedCommand(ApplicationConstants.LOG_DIR_EXPANSION_VAR, jobLib, command,
-        ApplicationConstants.STDOUT, ApplicationConstants.STDERR);
+    String formattedCommand =
+        getFormattedCommand(ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT,
+            ApplicationConstants.STDERR);
 
     log.info("Running Processor ID: {} on Container ID: {} on host: {} using command: {} and env: {} and package path: {}",
         processorId, containerIdStr, container.getNodeHttpAddress(), formattedCommand, env, packagePath);
@@ -696,18 +688,9 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   }
 
 
-  private String getFormattedCommand(String logDirExpansionVar,
-                                     String jobLib,
-                                     String command,
-                                     String stdOut,
-                                     String stdErr) {
-    if (!jobLib.isEmpty()) {
-      jobLib = "&& " + jobLib; // add job's libraries exported to an env variable
-    }
-
-    return String
-        .format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar,
-            jobLib, logDirExpansionVar, command, stdOut, stdErr);
+  private String getFormattedCommand(String logDirExpansionVar, String command, String stdOut, String stdErr) {
+    return String.format("export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s",
+        logDirExpansionVar, logDirExpansionVar, command, stdOut, stdErr);
   }
 
   /**
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index b363590..ee6a498 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -48,9 +48,6 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
       appId = client.submitApplication(
         config,
         List(
-          // we need something like this:
-          //"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec <fwk_path>/bin/run-am.sh 1>logs/%s 2>logs/%s"
-
           "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s"
             format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR,
             cmdExec, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),
@@ -88,25 +85,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
   }
 
   def buildAmCmd() =  {
-    // figure out if we have framework is deployed into a separate location
-    val fwkPath = config.get(JobConfig.SAMZA_FWK_PATH, "")
-    var fwkVersion = config.get(JobConfig.SAMZA_FWK_VERSION)
-    if (fwkVersion == null || fwkVersion.isEmpty()) {
-      fwkVersion = "STABLE"
-    }
-    logger.info("Inside YarnJob: fwk_path is %s, ver is %s use it directly " format(fwkPath, fwkVersion))
-
-    var cmdExec = "./__package/bin/run-jc.sh" // default location
-
-    if (!fwkPath.isEmpty()) {
-      // if we have framework installed as a separate package - use it
-      cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-jc.sh"
-
-      logger.info("Using FWK path: " + "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s".
-             format(ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, cmdExec,
-                    ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
-
-    }
+    val cmdExec = "./__package/bin/run-jc.sh" // default location
     cmdExec
   }