You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/04/28 21:27:56 UTC

[samza] branch master updated: SAMZA-2647: Clean up unused split deployment code (#1493)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new de067c4  SAMZA-2647: Clean up unused split deployment code (#1493)
de067c4 is described below

commit de067c48d8bbac894e0efa41cda37ebf26551e42
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Wed Apr 28 14:27:47 2021 -0700

    SAMZA-2647: Clean up unused split deployment code (#1493)
    
    Issues: Some code was added for SEP-24 some time ago (#1172, #1173), but we are not moving forward with SEP-24 because it does not cleanly handle certain use cases. Since we don't need this code, it should get removed.
    
    Changes:
    1. Removed unused flows related to job coordinator dependency isolation.
    2. Removed unused classloader separation utils.
    
    API changes and usage/upgrade instructions:
    Removed some configs and environment variables related to split deployment, but the feature wasn't complete, so those shouldn't be used by Samza jobs anyways.
---
 build.gradle                                       |   1 -
 gradle/dependency-versions.gradle                  |   1 -
 .../classloader/DependencyIsolationUtils.java      |  50 ---
 .../classloader/IsolatingClassLoaderFactory.java   | 352 ---------------------
 .../ClusterBasedJobCoordinatorRunner.java          |  13 +-
 .../java/org/apache/samza/config/JobConfig.java    |   6 -
 .../apache/samza/config/ShellCommandConfig.java    |  23 --
 .../org/apache/samza/util/SplitDeploymentUtil.java |  88 ------
 .../TestIsolatingClassLoaderFactory.java           |  96 ------
 .../org/apache/samza/config/TestJobConfig.java     |  11 -
 .../apache/samza/util/TestSplitDeploymentUtil.java |  71 -----
 samza-shell/src/main/bash/run-class.sh             |  34 +-
 .../scala/org/apache/samza/job/yarn/YarnJob.scala  |  28 +-
 .../org/apache/samza/job/yarn/TestYarnJob.java     |  45 ---
 14 files changed, 15 insertions(+), 804 deletions(-)

diff --git a/build.gradle b/build.gradle
index 9c48b23..64cef19 100644
--- a/build.gradle
+++ b/build.gradle
@@ -195,7 +195,6 @@ project(":samza-core_$scalaSuffix") {
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     compile "net.jodah:failsafe:$failsafeVersion"
-    compile "com.linkedin.cytodynamics:cytodynamics-nucleus:$cytodynamicsVersion"
     testCompile project(":samza-api").sourceSets.test.output
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index e289af0..738e2d0 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -25,7 +25,6 @@
   commonsHttpClientVersion = "3.1"
   commonsIoVersion = "2.8.0"
   commonsLang3Version = "3.11"
-  cytodynamicsVersion = "0.2.0"
   elasticsearchVersion = "2.2.0"
   gsonVersion = "2.8.6"
   guavaVersion = "30.1-jre"
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java b/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
deleted file mode 100644
index a0b5d1e..0000000
--- a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-public class DependencyIsolationUtils {
-  /**
-   * Directory inside the home directory of the cluster-based job coordinator in which the framework API artifacts are
-   * placed, for usage in dependency isolation for the cluster-based job coordinator.
-   * TODO make this configurable or taken from an environment variable
-   */
-  public static final String FRAMEWORK_API_DIRECTORY = "__samzaFrameworkApi";
-
-  /**
-   * Directory inside the home directory of the cluster-based job coordinator in which the framework infrastructure
-   * artifacts are placed, for usage in dependency isolation for the cluster-based job coordinator.
-   * TODO make this configurable or taken from an environment variable
-   */
-  public static final String FRAMEWORK_INFRASTRUCTURE_DIRECTORY = "__samzaFrameworkInfrastructure";
-
-  /**
-   * Directory inside the home directory of the cluster-based job coordinator in which the application artifacts are
-   * placed, for usage in dependency isolation for the cluster-based job coordinator.
-   * TODO make this configurable or taken from an environment variable
-   */
-  public static final String APPLICATION_DIRECTORY = "__package";
-
-  /**
-   * Name of the file which contains the class names (or globs) which should be loaded from the framework API
-   * classloader.
-   */
-  public static final String FRAMEWORK_API_CLASS_LIST_FILE_NAME = "samza-framework-api-classes.txt";
-
-  public static final String RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME = "runtime-framework-resources-pathing.jar";
-}
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java b/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
deleted file mode 100644
index 344a034..0000000
--- a/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-import com.linkedin.cytodynamics.matcher.BootstrapClassPredicate;
-import com.linkedin.cytodynamics.matcher.GlobMatcher;
-import com.linkedin.cytodynamics.nucleus.DelegateRelationship;
-import com.linkedin.cytodynamics.nucleus.DelegateRelationshipBuilder;
-import com.linkedin.cytodynamics.nucleus.IsolationLevel;
-import com.linkedin.cytodynamics.nucleus.LoaderBuilder;
-import com.linkedin.cytodynamics.nucleus.OriginRestriction;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Use this to build a classloader for running Samza which isolates the Samza framework code/dependencies from the
- * application code/dependencies.
- */
-public class IsolatingClassLoaderFactory {
-  private static final Logger LOG = LoggerFactory.getLogger(IsolatingClassLoaderFactory.class);
-
-  private static final String LIB_DIRECTORY = "lib";
-
-  /**
-   * Build a classloader which will isolate Samza framework code from application code. Samza framework classes and
-   * application-specific classes will be loaded using a different classloaders. This will enable dependencies of each
-   * category of classes to also be loaded separately, so that runtime dependency conflicts do not happen.
-   * Each call to this method will build a different instance of a classloader.
-   *
-   * Samza framework API classes need to be specified in a file called
-   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} which is in the lib directory which is in the
-   * API package. The file needs to be generated when building the framework API package. This class will not generate
-   * the file.
-   *
-   * Implementation notes:
-   *
-   * The cytodynamics isolating classloader is used for this. It provides more control than the built-in
-   * {@link URLClassLoader}. Cytodynamics provides the ability to compose multiple classloaders together and have more
-   * granular delegation strategies between the classloaders.
-   *
-   * In order to share objects between classes loaded by different classloaders, the classes for the shared objects must
-   * be loaded by a common classloader. Those common classes will be loaded through a common API classloader. The
-   * cytodynamics classloader can be set up to only use the common API classloader for an explicit set of classes. The
-   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file should include the framework API classes.
-   * Also, bootstrap classes (e.g. java.lang.String) need to be loaded by a common classloader, since objects of those
-   * types need to be shared across different framework and application. There are also some static bootstrap classes
-   * which should be shared (e.g. java.lang.System). Bootstrap classes will be loaded through a common classloader by
-   * default.
-   *
-   * These are the classloaders which are used to make up the final classloader.
-   * <ul>
-   *   <li>bootstrap classloader: Built-in Java classes (e.g. java.lang.String)</li>
-   *   <li>API classloader: Common Samza framework API classes</li>
-   *   <li>infrastructure classloader: Core Samza framework classes and plugins that are included in the framework</li>
-   *   <li>
-   *     application classloader: Application code and plugins that are needed in the app but are not included in the
-   *     framework
-   *   </li>
-   * </ul>
-   *
-   * This is the delegation structure for the classloaders:
-   * <pre>
-   *   (bootstrap               (API                  (application
-   *   classloader) &lt;---- classloader) &lt;------- classloader)
-   *                             ^                      ^
-   *                             |                     /
-   *                             |                    /
-   *                             |                   /
-   *                             |                  /
-   *                         (infrastructure classloader)
-   * </pre>
-   * The cytodynamics classloader allows control over when the delegation should happen.
-   * <ol>
-   *   <li>API classloader delegates to the bootstrap classloader if the bootstrap classloader has the class.</li>
-   *   <li>
-   *     Infrastructure classloader only delegates to the API classloader for the common classes specified by
-   *     {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
-   *   </li>
-   *   <li>
-   *     Infrastructure classloader delegates to the application classloader when a class can't be found in the
-   *     infrastructure classloader.
-   *   </li>
-   *   <li>
-   *     Application classloader only delegates to the API classloader for the common classes specified by
-   *     {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
-   *   </li>
-   * </ol>
-   */
-  public ClassLoader buildClassLoader() {
-    // start at the user.dir to find the resources for the classpaths
-    File baseJobDirectory = new File(System.getProperty("user.dir"));
-    File apiLibDirectory = libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.FRAMEWORK_API_DIRECTORY));
-    LOG.info("Using API lib directory: {}", apiLibDirectory);
-    File infrastructureLibDirectory =
-        libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY));
-    LOG.info("Using infrastructure lib directory: {}", infrastructureLibDirectory);
-    File applicationLibDirectory =
-        libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.APPLICATION_DIRECTORY));
-    LOG.info("Using application lib directory: {}", applicationLibDirectory);
-
-    ClassLoader apiClassLoader = buildApiClassLoader(apiLibDirectory);
-    ClassLoader applicationClassLoader =
-        buildApplicationClassLoader(applicationLibDirectory, apiLibDirectory, apiClassLoader);
-
-    // the classloader to return is the one with the infrastructure classpath
-    return buildInfrastructureClassLoader(infrastructureLibDirectory, baseJobDirectory, apiLibDirectory, apiClassLoader,
-        applicationClassLoader);
-  }
-
-  /**
-   * Build the {@link ClassLoader} which can load framework API classes.
-   *
-   * This sets up the link between the bootstrap classloader and the API classloader (see {@link #buildClassLoader()}.
-   */
-  private static ClassLoader buildApiClassLoader(File apiLibDirectory) {
-    /*
-     * This can just use the built-in classloading, which checks the parent classloader first and then checks its own
-     * classpath. A null parent means bootstrap classloader, which contains core Java classes (e.g. java.lang.String).
-     * This doesn't need to be isolated from the parent, because we only want to load all bootstrap classes from the
-     * bootstrap classloader.
-     */
-    return new URLClassLoader(getClasspathAsURLs(apiLibDirectory), null);
-  }
-
-  /**
-   * Build the {@link ClassLoader} which can load application classes.
-   *
-   * This sets up the link between the application classloader and the API classloader (see {@link #buildClassLoader()}.
-   */
-  private static ClassLoader buildApplicationClassLoader(File applicationLibDirectory, File apiLibDirectory,
-      ClassLoader apiClassLoader) {
-    return LoaderBuilder.anIsolatingLoader()
-        // look in application lib directory for JARs
-        .withClasspath(getClasspathAsURIs(applicationLibDirectory))
-        // getClasspathAsURIs should only return JARs within applicationLibDirectory anyways, but doing it to be safe
-        .withOriginRestriction(OriginRestriction.denyByDefault().allowingDirectory(applicationLibDirectory, false))
-        // delegate to the api classloader for API classes
-        .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
-        .build();
-  }
-
-  /**
-   * Build the {@link ClassLoader} which can load Samza framework core classes. If a file with the name
-   * {@link DependencyIsolationUtils#RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME} is found in {@code baseJobDirectory},
-   * then it will be included in the classpath.
-   * This may also fall back to loading application classes.
-   *
-   * This sets up two links: One link between the infrastructure classloader and the API and another link between the
-   * infrastructure classloader and the application classloader (see {@link #buildClassLoader()}.
-   */
-  private static ClassLoader buildInfrastructureClassLoader(File infrastructureLibDirectory,
-      File baseJobDirectory,
-      File apiLibDirectory,
-      ClassLoader apiClassLoader,
-      ClassLoader applicationClassLoader) {
-    // start with JARs in infrastructure lib directory
-    List<URI> classpathURIs = new ArrayList<>(getClasspathAsURIs(infrastructureLibDirectory));
-    OriginRestriction originRestriction = OriginRestriction.denyByDefault()
-        // getClasspathAsURIs should only return JARs within infrastructureLibDirectory anyways, but doing it to be safe
-        .allowingDirectory(infrastructureLibDirectory, false);
-    File runtimeFrameworkResourcesPathingJar =
-        new File(baseJobDirectory, DependencyIsolationUtils.RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME);
-    if (canAccess(runtimeFrameworkResourcesPathingJar)) {
-      // if there is a runtime framework resources pathing JAR, then include that in the classpath as well
-      classpathURIs.add(runtimeFrameworkResourcesPathingJar.toURI());
-      originRestriction.allowingGlobPattern(fileURL(runtimeFrameworkResourcesPathingJar).toExternalForm());
-      LOG.info("Added {} to infrastructure classpath", runtimeFrameworkResourcesPathingJar.getPath());
-    } else {
-      LOG.info("Unable to access {}, so not adding to infrastructure classpath",
-          runtimeFrameworkResourcesPathingJar.getPath());
-    }
-    return LoaderBuilder.anIsolatingLoader()
-        .withClasspath(Collections.unmodifiableList(classpathURIs))
-        .withOriginRestriction(originRestriction)
-        .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
-        /*
-         * Fall back to the application classloader for certain classes. For example, the application might implement
-         * some pluggable classes (e.g. SystemFactory). Another example is message schemas that are supplied by the
-         * application.
-         */
-        .addFallbackDelegate(DelegateRelationshipBuilder.builder()
-            .withDelegateClassLoader(applicationClassLoader)
-            /*
-             * NONE means that a class will be loaded from here if it is not found in the classpath of the loader that uses
-             * this relationship.
-             */
-            .withIsolationLevel(IsolationLevel.NONE)
-            .build())
-        .build();
-  }
-
-  /**
-   * Build a {@link DelegateRelationship} which defines how to delegate to the API classloader.
-   *
-   * Delegation will only happen for classes specified in
-   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} and the Java bootstrap classes.
-   */
-  private static DelegateRelationship buildApiParentRelationship(File apiLibDirectory, ClassLoader apiClassLoader) {
-    DelegateRelationshipBuilder apiParentRelationshipBuilder = DelegateRelationshipBuilder.builder()
-        // needs to load API classes from the API classloader
-        .withDelegateClassLoader(apiClassLoader)
-        /*
-         * FULL means to only load classes explicitly specified as "API" from the API classloader. We will use
-         * delegate-preferred class predicates to specify which classes are "API" (see below).
-         */
-        .withIsolationLevel(IsolationLevel.FULL);
-
-    // bootstrap classes need to be loaded from a common classloader
-    apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new BootstrapClassPredicate());
-    // the classes which are Samza framework API classes are added here
-    getFrameworkApiClassGlobs(apiLibDirectory).forEach(
-      apiClassName -> apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new GlobMatcher(apiClassName)));
-    return apiParentRelationshipBuilder.build();
-  }
-
-  /**
-   * Gets the globs for matching against classes to load from the framework API classloader. This will read the
-   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file in {@code directoryWithClassList} to get
-   * the globs.
-   *
-   * @param directoryWithClassList Directory in which
-   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} lives
-   * @return {@link List} of globs for matching against classes to load from the framework API classloader
-   */
-  @VisibleForTesting
-  static List<String> getFrameworkApiClassGlobs(File directoryWithClassList) {
-    File parentPreferredFile =
-        new File(directoryWithClassList, DependencyIsolationUtils.FRAMEWORK_API_CLASS_LIST_FILE_NAME);
-    validateCanAccess(parentPreferredFile);
-    try {
-      return Files.readAllLines(Paths.get(parentPreferredFile.toURI()), StandardCharsets.UTF_8)
-          .stream()
-          .filter(StringUtils::isNotBlank)
-          .collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new SamzaException("Error while reading samza-api class list", e);
-    }
-  }
-
-  /**
-   * Get the {@link URL}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
-   * not recursive.
-   */
-  @VisibleForTesting
-  static URL[] getClasspathAsURLs(File jarsLocation) {
-    validateCanAccess(jarsLocation);
-    File[] filesInJarsLocation = jarsLocation.listFiles();
-    if (filesInJarsLocation == null) {
-      throw new SamzaException(
-          String.format("Could not find any files inside %s, probably because it is not a directory",
-              jarsLocation.getPath()));
-    }
-    URL[] urls = Stream.of(filesInJarsLocation)
-        .filter(file -> file.getName().endsWith(".jar") || file.getName().endsWith(".war"))
-        .map(IsolatingClassLoaderFactory::fileURL)
-        .toArray(URL[]::new);
-    LOG.info("Found {} items to load into classpath from {}", urls.length, jarsLocation);
-    Stream.of(urls).forEach(url -> LOG.debug("Found {} from {}", url, jarsLocation));
-    return urls;
-  }
-
-  /**
-   * Get the {@link URI}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
-   * not recursive.
-   */
-  @VisibleForTesting
-  static List<URI> getClasspathAsURIs(File jarsLocation) {
-    return Stream.of(getClasspathAsURLs(jarsLocation))
-        .map(IsolatingClassLoaderFactory::urlToURI)
-        .collect(Collectors.toList());
-  }
-
-  private static boolean canAccess(File file) {
-    return file.exists() && file.canRead();
-  }
-
-  /**
-   * Makes sure that a file exists and can be read.
-   */
-  private static void validateCanAccess(File file) {
-    if (!canAccess(file)) {
-      throw new SamzaException("Unable to access file: " + file);
-    }
-  }
-
-  /**
-   * Get the {@link URL} for a {@link File}.
-   * Converts checked exceptions into {@link SamzaException}s.
-   */
-  private static URL fileURL(File file) {
-    URI uri = file.toURI();
-    try {
-      return uri.toURL();
-    } catch (MalformedURLException e) {
-      throw new SamzaException("Unable to get URL for file: " + file, e);
-    }
-  }
-
-  /**
-   * Get the {@link URI} for a {@link URL}.
-   * Converts checked exceptions into {@link SamzaException}s.
-   */
-  private static URI urlToURI(URL url) {
-    try {
-      return url.toURI();
-    } catch (URISyntaxException e) {
-      throw new SamzaException("Unable to get URI for URL: " + url, e);
-    }
-  }
-
-  /**
-   * Get the {@link File} representing the {@link #LIB_DIRECTORY} inside the given {@code file}.
-   */
-  private static File libDirectory(File file) {
-    return new File(file, LIB_DIRECTORY);
-  }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
index a152032..523742c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
@@ -26,7 +26,6 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.classloader.IsolatingClassLoaderFactory;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -35,7 +34,6 @@ import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.util.CoordinatorStreamUtil;
-import org.apache.samza.util.SplitDeploymentUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,19 +50,12 @@ public class ClusterBasedJobCoordinatorRunner {
       LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. Exiting job coordinator", exception);
       System.exit(1);
     });
-    if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
-      // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
-      runClusterBasedJobCoordinator(args);
-    } else {
-      SplitDeploymentUtil.runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(),
-          ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
-    }
+    runClusterBasedJobCoordinator(args);
     System.exit(0);
   }
 
   /**
-   * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
-   * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
+   * This is the actual execution for the {@link ClusterBasedJobCoordinator}.
    */
   @VisibleForTesting
   static void runClusterBasedJobCoordinator(String[] args) {
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 021c67d..2aa5c2a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -143,8 +143,6 @@ public class JobConfig extends MapConfig {
   public static final String COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory";
   public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = "org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
 
-  public static final String JOB_SPLIT_DEPLOYMENT_ENABLED = "job.split.deployment.enabled";
-
   private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
 
   // Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability (AM-HA).
@@ -386,10 +384,6 @@ public class JobConfig extends MapConfig {
     return getStandbyTaskReplicationFactor() > 1;
   }
 
-  public boolean isSplitDeploymentEnabled() {
-    return getBoolean(JOB_SPLIT_DEPLOYMENT_ENABLED, false);
-  }
-
   /**
    * The metadata file is written in a {@code exec-env-container-id}.metadata file in the log-dir of the container.
    * Here the {@code exec-env-container-id} refers to the ID assigned by the cluster manager (e.g., YARN) to the container,
diff --git a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
index 73093a8..73bcf8e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
@@ -57,29 +57,6 @@ public class ShellCommandConfig extends MapConfig {
    */
   public static final String ENV_EXECUTION_ENV_CONTAINER_ID = "EXECUTION_ENV_CONTAINER_ID";
 
-  /**
-   * Set to "true" if split deployment feature is enabled. Otherwise, will be considered false.
-   *
-   * The launch process for the cluster-based job coordinator and job container depends on the value of this, since it
-   * needs to be known if the cluster-based job coordinator and job container should be launched in a split deployment
-   * mode.
-   * This needs to be an environment variable, because the value needs to be known before the full configs can be read
-   * from the metadata store (full configs are only read after launch is complete).
-   */
-  public static final String ENV_SPLIT_DEPLOYMENT_ENABLED = "ENV_SPLIT_DEPLOYMENT_ENABLED";
-
-  /**
-   * When running the cluster-based job coordinator and job container in a split deployment mode, it uses JARs and
-   * resources from a lib directory which is provided by the framework. In some cases, it is necessary to use some
-   * resources specified by the application as well. This environment variable can be set to a directory which is
-   * different from the framework lib directory in order to tell Samza where application resources live.
-   * This is an environment variable because it is needed in order to launch the cluster-based job coordinator and job
-   * container Java processes, which means access to full configs is not available yet.
-   * For example, this is used to set a system property for the location of an application-specified log4j configuration
-   * file when launching the cluster-based job coordinator and job container Java processes.
-   */
-  public static final String ENV_APPLICATION_LIB_DIR = "APPLICATION_LIB_DIR";
-
   /*
    * The base directory for storing logged data stores used in Samza. This has to be set on all machine running Samza
    * containers. For example, when using YARN, it has to be set in all NMs and passed to the containers.
diff --git a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java b/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
deleted file mode 100644
index 200cd3c..0000000
--- a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.ShellCommandConfig;
-
-
-public final class SplitDeploymentUtil {
-
-  /**
-   * The split deployment feature uses system env {@code ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED} to represent
-   * if the user chooses to enable it.
-   * This function helps to detect if the split deployment feature is enabled.
-   *
-   * @return true if split deployment is enabled; vice versa
-   */
-  public static boolean isSplitDeploymentEnabled() {
-    return Boolean.parseBoolean(System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
-  }
-
-  /**
-   * Execute the runner class using a separate isolated classloader.
-   * @param classLoader {@link ClassLoader} to use to load the runner class which will run
-   * @param originalRunnerClass {@link Class} for which will be executed with the new class loader.
-   * @param runMethodName run method name of runner class
-   * @param runMethodArgs arguments to pass to run method
-   */
-  public static void runWithClassLoader(ClassLoader classLoader, Class<?> originalRunnerClass, String runMethodName,
-      String[] runMethodArgs) {
-    // need to use the isolated classloader to load run method and then execute using that new class
-    Class<?> runnerClass;
-    try {
-      runnerClass = classLoader.loadClass(originalRunnerClass.getName());
-    } catch (ClassNotFoundException e) {
-      throw new SamzaException(String.format(
-          "Isolation was enabled, but unable to find %s in isolated classloader", originalRunnerClass.getName()), e);
-    }
-
-    // save the current context classloader so it can be reset after finishing the call to run method
-    ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
-    // this is needed because certain libraries (e.g. log4j) use the context classloader
-    Thread.currentThread().setContextClassLoader(classLoader);
-
-    try {
-      executeRunForRunnerClass(runnerClass, runMethodName, runMethodArgs);
-    } finally {
-      // reset the context class loader; it's good practice, and could be important when running a test suite
-      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
-    }
-  }
-
-  private static void executeRunForRunnerClass(Class<?> runnerClass, String runMethodName, String[] runMethodArgs) {
-    Method runMethod;
-    try {
-      runMethod = runnerClass.getDeclaredMethod(runMethodName, String[].class);
-    } catch (NoSuchMethodException e) {
-      throw new SamzaException(String.format("Isolation was enabled, but unable to find %s method", runMethodName), e);
-    }
-    // only sets accessible flag for this method instance
-    runMethod.setAccessible(true);
-
-    try {
-      // wrapping args in object array so that args is passed as a single argument to the method
-      runMethod.invoke(null, new Object[]{runMethodArgs});
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      throw new SamzaException(String.format("Exception while executing %s method", runMethodName), e);
-    }
-  }
-}
diff --git a/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java b/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
deleted file mode 100644
index 7444fbf..0000000
--- a/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-import java.io.File;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Set;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.samza.SamzaException;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-public class TestIsolatingClassLoaderFactory {
-  @Test
-  public void testGetApiClasses() throws URISyntaxException {
-    File apiClassListFile = Paths.get(getClass().getResource("/classloader").toURI()).toFile();
-    List<String> apiClassNames = IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(apiClassListFile);
-    List<String> expected = ImmutableList.of(
-        "org.apache.samza.JavaClass",
-        "org.apache.samza.JavaClass$InnerJavaClass",
-        "org.apache.samza.ScalaClass$",
-        "org.apache.samza.ScalaClass$$anon$1",
-        "my.package.with.wildcard.*",
-        "my.package.with.question.mark?");
-    assertEquals(expected, apiClassNames);
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testGetApiClassesFileDoesNotExist() throws URISyntaxException {
-    File nonExistentDirectory =
-        new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
-    IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(nonExistentDirectory);
-  }
-
-  @Test
-  public void testGetClasspathAsURLs() throws URISyntaxException {
-    File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
-    URL[] classpath = IsolatingClassLoaderFactory.getClasspathAsURLs(classpathDirectory);
-    assertEquals(2, classpath.length);
-    Set<URL> classpathSet = ImmutableSet.copyOf(classpath);
-    URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
-    assertTrue(classpathSet.contains(jarUrl));
-    URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
-    assertTrue(classpathSet.contains(warUrl));
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testGetClasspathAsURLsDirectoryDoesNotExist() throws URISyntaxException {
-    File nonExistentDirectory =
-        new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
-    IsolatingClassLoaderFactory.getClasspathAsURLs(nonExistentDirectory);
-  }
-
-  @Test
-  public void testGetClasspathAsURIs() throws URISyntaxException {
-    File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
-    List<URI> classpath = IsolatingClassLoaderFactory.getClasspathAsURIs(classpathDirectory);
-    assertEquals(2, classpath.size());
-    Set<URI> classpathSet = ImmutableSet.copyOf(classpath);
-    URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
-    assertTrue(classpathSet.contains(jarUrl.toURI()));
-    URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
-    assertTrue(classpathSet.contains(warUrl.toURI()));
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testGetClasspathAsURIsDirectoryDoesNotExist() throws URISyntaxException {
-    File nonExistentDirectory =
-        new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
-    IsolatingClassLoaderFactory.getClasspathAsURIs(nonExistentDirectory);
-  }
-}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 00c2d5e..af4bd1d 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -546,17 +546,6 @@ public class TestJobConfig {
   }
 
   @Test
-  public void testGetClusterBasedJobCoordinatorDependencyIsolationEnabled() {
-    Config config = new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
-    assertTrue(new JobConfig(config).isSplitDeploymentEnabled());
-
-    config = new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false"));
-    assertFalse(new JobConfig(config).isSplitDeploymentEnabled());
-
-    assertFalse(new JobConfig(new MapConfig()).isSplitDeploymentEnabled());
-  }
-
-  @Test
   public void testGetMetadataFile() {
     String execEnvContainerId = "container-id";
     String containerMetadataDirectory = "/tmp/samza/log/dir";
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
deleted file mode 100644
index 72772ba..0000000
--- a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import org.apache.samza.clustermanager.ClusterBasedJobCoordinatorRunner;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.*;
-import static org.mockito.AdditionalMatchers.*;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.*;
-
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ClusterBasedJobCoordinatorRunner.class})
-public class TestSplitDeploymentUtil {
-
-  @Test
-  public void testRunWithIsolatingClassLoader() throws Exception {
-    // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
-    PowerMockito.spy(ClusterBasedJobCoordinatorRunner.class);
-    // save the context classloader to make sure that it gets set properly once the test is finished
-    ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
-    ClassLoader classLoader = mock(ClassLoader.class);
-    String[] args = new String[]{"arg0", "arg1"};
-    doReturn(ClusterBasedJobCoordinatorRunner.class).when(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
-
-    // stub the private static method which is called by reflection
-    PowerMockito.doAnswer(invocation -> {
-        // make sure the only calls to this method has the expected arguments
-      assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
-      // checks that the context classloader is set correctly
-      assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
-      return null;
-    }).when(ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", any());
-
-    try {
-      SplitDeploymentUtil.runWithClassLoader(classLoader,
-          ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
-      assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
-    } finally {
-      // reset it explicitly just in case runWithClassLoader throws an exception
-      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
-    }
-    // make sure that the classloader got used
-    verify(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
-    // make sure runClusterBasedJobCoordinator only got called once
-    verifyPrivate(ClusterBasedJobCoordinatorRunner.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
-  }
-}
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index 9b5ac30..2dc1ec9 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -53,13 +53,13 @@ export APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
 echo APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
 echo BASE_LIB_DIR=$BASE_LIB_DIR
 
-BASE_LIB_CLASSPATH=""
+CLASSPATH=""
 # all the jars need to be appended on newlines to ensure line argument length of 72 bytes is not violated
 for file in $BASE_LIB_DIR/*.[jw]ar;
 do
-  BASE_LIB_CLASSPATH=$BASE_LIB_CLASSPATH" $file \n"
+  CLASSPATH=$CLASSPATH" $file \n"
 done
-echo generated from BASE_LIB_DIR BASE_LIB_CLASSPATH=$BASE_LIB_CLASSPATH
+echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH
 
 # In some cases (AWS) $JAVA_HOME/bin doesn't contain jar.
 if [ -z "$JAVA_HOME" ] || [ ! -e "$JAVA_HOME/bin/jar" ]; then
@@ -68,23 +68,10 @@ else
   JAR="$JAVA_HOME/bin/jar"
 fi
 
-# Create a pathing JAR for the JARs in the BASE_LIB_DIR
 # Newlines and spaces are intended to ensure proper parsing of manifest in pathing jar
-printf "Class-Path: \n $BASE_LIB_CLASSPATH \n" > base-lib-manifest.txt
-# Creates a new archive and adds custom manifest information to base-lib-pathing.jar
-eval "$JAR -cvmf base-lib-manifest.txt base-lib-pathing.jar"
-
-# Create a pathing JAR for the runtime framework resources. It is useful to separate this from the base-lib-pathing.jar
-# because the split deployment framework may only need the resources from this runtime pathing JAR.
-if ! [[ $HADOOP_CONF_DIR =~ .*/$ ]]; then
-  # manifest requires a directory to have a trailing slash
-  HADOOP_CONF_DIR="$HADOOP_CONF_DIR/"
-fi
-# HADOOP_CONF_DIR should be supplied to classpath explicitly for Yarn to parse configs
-RUNTIME_FRAMEWORK_RESOURCES_CLASSPATH="$HADOOP_CONF_DIR \n"
-# TODO add JARs from ADDITIONAL_CLASSPATH_DIR to runtime-framework-resources-pathing.jar as well
-printf "Class-Path: \n $RUNTIME_FRAMEWORK_RESOURCES_CLASSPATH \n" > runtime-framework-resources-manifest.txt
-eval "$JAR -cvmf runtime-framework-resources-manifest.txt runtime-framework-resources-pathing.jar"
+printf "Class-Path: \n $CLASSPATH \n" > manifest.txt
+# Creates a new archive and adds custom manifest information to pathing.jar
+eval "$JAR -cvmf manifest.txt pathing.jar"
 
 if [ -z "$JAVA_HOME" ]; then
   JAVA="java"
@@ -163,11 +150,12 @@ fi
 # Check if 64 bit is set. If not - try and set it if it's supported
 [[ $JAVA_OPTS != *-d64* ]] && check_and_enable_64_bit_mode
 
-echo $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar "$@"
+# HADOOP_CONF_DIR should be supplied to classpath explicitly for Yarn to parse configs
+echo $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:pathing.jar "$@"
 
 ## If localized resource lib directory is defined, then include it in the classpath.
 if [[ -z "${ADDITIONAL_CLASSPATH_DIR}" ]]; then
-  exec $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar "$@"
+  exec $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:pathing.jar "$@"
 else
-  exec $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar:$ADDITIONAL_CLASSPATH_DIR "$@"
-fi
\ No newline at end of file
+  exec $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:pathing.jar:$ADDITIONAL_CLASSPATH_DIR "$@"
+fi
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 7e4565b..237667d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -19,14 +19,11 @@
 
 package org.apache.samza.job.yarn
 
-import java.lang.Boolean
-
 import com.google.common.annotations.VisibleForTesting
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.samza.SamzaException
-import org.apache.samza.classloader.DependencyIsolationUtils
 import org.apache.samza.config.{Config, JobConfig, ShellCommandConfig, YarnConfig}
 import org.apache.samza.job.ApplicationStatus.{SuccessfulFinish, UnsuccessfulFinish}
 import org.apache.samza.job.{ApplicationStatus, StreamJob}
@@ -46,7 +43,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob wit
   def submit: YarnJob = {
     try {
       val jobConfig = new JobConfig(config)
-      val cmdExec = YarnJob.buildJobCoordinatorCmd(config, jobConfig)
+      val cmdExec = "./__package/bin/run-jc.sh"
       val environment = YarnJob.buildEnvironment(config, this.yarnConfig, jobConfig)
 
       appId = client.submitApplication(
@@ -184,13 +181,6 @@ object YarnJob extends Logging {
         Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig))
     }
     envMapBuilder += ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(yarnConfig.getAmOpts)
-    val splitDeploymentEnabled = jobConfig.isSplitDeploymentEnabled
-    envMapBuilder += ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED -> Util.envVarEscape(Boolean.toString(splitDeploymentEnabled))
-    if (splitDeploymentEnabled) {
-      //split deployment is enabled, so need to specify where the application lib directory is for app resources
-      envMapBuilder += ShellCommandConfig.ENV_APPLICATION_LIB_DIR ->
-        Util.envVarEscape(String.format("./%s/lib", DependencyIsolationUtils.APPLICATION_DIRECTORY))
-    }
     Option.apply(yarnConfig.getAMJavaHome).foreach {
       amJavaHome => envMapBuilder += ShellCommandConfig.ENV_JAVA_HOME -> amJavaHome
     }
@@ -198,18 +188,4 @@ object YarnJob extends Logging {
       Util.envVarEscape(config.get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, ""))
     envMapBuilder.result()
   }
-
-  /**
-    * Build the command for the job coordinator execution.
-    * Passing multiple separate config objects so that they can be reused in other places.
-    */
-  @VisibleForTesting
-  private[yarn] def buildJobCoordinatorCmd(config: Config, jobConfig: JobConfig): String = {
-    var cmdExec = "./__package/bin/run-jc.sh" // default location
-    if (jobConfig.isSplitDeploymentEnabled) {
-      cmdExec = "./%s/bin/run-jc.sh" format DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY
-      logger.info("Using isolated cluster-based job coordinator path: %s" format cmdExec)
-    }
-    cmdExec
-  }
-}
\ No newline at end of file
+}
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
index f068800..4e2c4a7 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
@@ -21,7 +21,6 @@ package org.apache.samza.job.yarn;
 import java.io.IOException;
 import java.util.Map;
 import com.google.common.collect.ImmutableMap;
-import org.apache.samza.classloader.DependencyIsolationUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -38,19 +37,6 @@ import static org.junit.Assert.assertEquals;
 
 public class TestYarnJob {
   @Test
-  public void testBuildJobCoordinatorCmd() {
-    // cluster-based job coordinator dependency isolation is not enabled; use script from __package directory
-    Config config = new MapConfig();
-    assertEquals("./__package/bin/run-jc.sh", YarnJob$.MODULE$.buildJobCoordinatorCmd(config, new JobConfig(config)));
-
-    // split deployment is enabled; use script from framework infrastructure directory
-    Config splitDeploymentEnabled =
-        new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
-    assertEquals(String.format("./%s/bin/run-jc.sh", DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY),
-        YarnJob$.MODULE$.buildJobCoordinatorCmd(splitDeploymentEnabled, new JobConfig(splitDeploymentEnabled)));
-  }
-
-  @Test
   public void testBuildEnvironment() throws IOException {
     String amJvmOptions = "-Xmx1g -Dconfig.key='config value'";
     Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
@@ -58,35 +44,12 @@ public class TestYarnJob {
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
         .put(YarnConfig.AM_JVM_OPTIONS, amJvmOptions) // needs escaping
-        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
         .build());
     String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         .writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
         ShellCommandConfig.ENV_JAVA_OPTS, Util.envVarEscape(amJvmOptions),
-        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false",
-        ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
-    assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
-        YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
-  }
-
-  @Test
-  public void testBuildEnvironmentJobCoordinatorDependencyIsolationEnabled() throws IOException {
-    Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
-        .put(JobConfig.JOB_NAME, "jobName")
-        .put(JobConfig.JOB_ID, "jobId")
-        .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
-        .put(YarnConfig.AM_JVM_OPTIONS, "")
-        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
-        .build());
-    String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
-        .writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
-    Map<String, String> expected = ImmutableMap.of(
-        ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
-        ShellCommandConfig.ENV_JAVA_OPTS, "",
-        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
-        ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
         ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
@@ -99,7 +62,6 @@ public class TestYarnJob {
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
         .put(YarnConfig.AM_JVM_OPTIONS, "")
-        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
         .put(YarnConfig.AM_JAVA_HOME, "/some/path/to/java/home")
         .build());
     String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
@@ -107,7 +69,6 @@ public class TestYarnJob {
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
         ShellCommandConfig.ENV_JAVA_OPTS, "",
-        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false",
         ShellCommandConfig.ENV_JAVA_HOME, "/some/path/to/java/home",
         ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
@@ -121,15 +82,12 @@ public class TestYarnJob {
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory")
         .put(YarnConfig.AM_JVM_OPTIONS, "")
-        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
         .build());
     String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         .writeValueAsString(config));
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
         ShellCommandConfig.ENV_JAVA_OPTS, "",
-        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
-        ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
         ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
@@ -142,7 +100,6 @@ public class TestYarnJob {
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory")
         .put(YarnConfig.AM_JVM_OPTIONS, "")
-        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
         .put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "./sqlapp/lib/*")
         .build());
     String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
@@ -150,8 +107,6 @@ public class TestYarnJob {
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
         ShellCommandConfig.ENV_JAVA_OPTS, "",
-        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
-        ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
         ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "./sqlapp/lib/*");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());