You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2019/11/26 19:13:35 UTC

[samza] branch master updated: SAMZA-2333: [AM isolation] Use cytodynamics classloader to launch job coordinator (#1173)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3832bf8  SAMZA-2333: [AM isolation] Use cytodynamics classloader to launch job coordinator (#1173)
3832bf8 is described below

commit 3832bf86b1a9d6f1c171bab3dff80b3ac91f59a6
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Tue Nov 26 11:13:28 2019 -0800

    SAMZA-2333: [AM isolation] Use cytodynamics classloader to launch job coordinator (#1173)
---
 build.gradle                                       |   2 +
 gradle/dependency-versions.gradle                  |   3 +-
 .../classloader/DependencyIsolationUtils.java      |   8 +
 .../classloader/IsolatingClassLoaderFactory.java   | 327 +++++++++++++++++++++
 .../clustermanager/ClusterBasedJobCoordinator.java |  77 ++++-
 .../TestIsolatingClassLoaderFactory.java           |  96 ++++++
 .../TestClusterBasedJobCoordinator.java            |  40 ++-
 .../classloader/classpath/placeholder-jar.jar      |   1 +
 .../classloader/classpath/placeholder-txt.json     |   1 +
 .../classloader/classpath/placeholder-war.war      |   1 +
 .../classloader/samza-framework-api-classes.txt    |   6 +
 samza-shell/src/main/bash/run-class.sh             |  39 ++-
 12 files changed, 589 insertions(+), 12 deletions(-)

diff --git a/build.gradle b/build.gradle
index c4ca349..dbf777e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -89,6 +89,7 @@ rat {
     'samza-test/state/**',
     'README.md',
     'RELEASE.md',
+    'samza-core/src/test/resources/classloader/samza-framework-api-classes.txt',
     'samza-test/src/main/resources/**',
     'samza-hdfs/src/main/resources/**',
     'samza-hdfs/src/test/resources/**',
@@ -190,6 +191,7 @@ project(":samza-core_$scalaSuffix") {
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     compile "net.jodah:failsafe:$failsafeVersion"
+    compile "com.linkedin.cytodynamics:cytodynamics-nucleus:$cytodynamicsVersion"
     testCompile project(":samza-api").sourceSets.test.output
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index f70e879..fcf472e 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -24,8 +24,9 @@
   commonsCodecVersion = "1.9"
   commonsCollectionVersion = "3.2.1"
   commonsHttpClientVersion = "3.1"
-  commonsLang3Version = "3.4"
   commonsIoVersion = "2.6"
+  commonsLang3Version = "3.4"
+  cytodynamicsVersion = "0.2.0"
   elasticsearchVersion = "2.2.0"
   gsonVersion = "2.8.5"
   guavaVersion = "23.0"
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java b/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
index 0d44659..8f933ea 100644
--- a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
@@ -25,16 +25,24 @@ public class DependencyIsolationUtils {
    * TODO make this configurable or taken from an environment variable
    */
   public static final String FRAMEWORK_API_DIRECTORY = "__samzaFrameworkApi";
+
   /**
    * Directory inside the home directory of the cluster-based job coordinator in which the framework infrastructure
    * artifacts are placed, for usage in dependency isolation for the cluster-based job coordinator.
    * TODO make this configurable or taken from an environment variable
    */
   public static final String FRAMEWORK_INFRASTRUCTURE_DIRECTORY = "__samzaFrameworkInfrastructure";
+
   /**
    * Directory inside the home directory of the cluster-based job coordinator in which the application artifacts are
    * placed, for usage in dependency isolation for the cluster-based job coordinator.
    * TODO make this configurable or taken from an environment variable
    */
   public static final String APPLICATION_DIRECTORY = "__package";
+
+  /**
+   * Name of the file which contains the class names (or globs) which should be loaded from the framework API
+   * classloader.
+   */
+  public static final String FRAMEWORK_API_CLASS_LIST_FILE_NAME = "samza-framework-api-classes.txt";
 }
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java b/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
new file mode 100644
index 0000000..47d1ea0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.classloader;
+
+import com.linkedin.cytodynamics.matcher.BootstrapClassPredicate;
+import com.linkedin.cytodynamics.matcher.GlobMatcher;
+import com.linkedin.cytodynamics.nucleus.DelegateRelationship;
+import com.linkedin.cytodynamics.nucleus.DelegateRelationshipBuilder;
+import com.linkedin.cytodynamics.nucleus.IsolationLevel;
+import com.linkedin.cytodynamics.nucleus.LoaderBuilder;
+import com.linkedin.cytodynamics.nucleus.OriginRestriction;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Use this to build a classloader for running Samza which isolates the Samza framework code/dependencies from the
+ * application code/dependencies.
+ */
+public class IsolatingClassLoaderFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(IsolatingClassLoaderFactory.class);
+
+  private static final String LIB_DIRECTORY = "lib";
+
+  /**
+   * Build a classloader which will isolate Samza framework code from application code. Samza framework classes and
+   * application-specific classes will be loaded using a different classloaders. This will enable dependencies of each
+   * category of classes to also be loaded separately, so that runtime dependency conflicts do not happen.
+   * Each call to this method will build a different instance of a classloader.
+   *
+   * Samza framework API classes need to be specified in a file called
+   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} which is in the lib directory which is in the
+   * API package. The file needs to be generated when building the framework API package. This class will not generate
+   * the file.
+   *
+   * Implementation notes:
+   *
+   * The cytodynamics isolating classloader is used for this. It provides more control than the built-in
+   * {@link URLClassLoader}. Cytodynamics provides the ability to compose multiple classloaders together and have more
+   * granular delegation strategies between the classloaders.
+   *
+   * In order to share objects between classes loaded by different classloaders, the classes for the shared objects must
+   * be loaded by a common classloader. Those common classes will be loaded through a common API classloader. The
+   * cytodynamics classloader can be set up to only use the common API classloader for an explicit set of classes. The
+   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file should include the framework API classes.
+   * Also, bootstrap classes (e.g. java.lang.String) need to be loaded by a common classloader, since objects of those
+   * types need to be shared across different framework and application. There are also some static bootstrap classes
+   * which should be shared (e.g. java.lang.System). Bootstrap classes will be loaded through a common classloader by
+   * default.
+   *
+   * These are the classloaders which are used to make up the final classloader.
+   * <ul>
+   *   <li>bootstrap classloader: Built-in Java classes (e.g. java.lang.String)</li>
+   *   <li>API classloader: Common Samza framework API classes</li>
+   *   <li>infrastructure classloader: Core Samza framework classes and plugins that are included in the framework</li>
+   *   <li>
+   *     application classloader: Application code and plugins that are needed in the app but are not included in the
+   *     framework
+   *   </li>
+   * </ul>
+   *
+   * This is the delegation structure for the classloaders:
+   * <pre>
+   *   (bootstrap               (API                  (application
+   *   classloader) &lt;---- classloader) &lt;------- classloader)
+   *                             ^                      ^
+   *                             |                     /
+   *                             |                    /
+   *                             |                   /
+   *                             |                  /
+   *                         (infrastructure classloader)
+   * </pre>
+   * The cytodynamics classloader allows control over when the delegation should happen.
+   * <ol>
+   *   <li>API classloader delegates to the bootstrap classloader if the bootstrap classloader has the class.</li>
+   *   <li>
+   *     Infrastructure classloader only delegates to the API classloader for the common classes specified by
+   *     {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
+   *   </li>
+   *   <li>
+   *     Infrastructure classloader delegates to the application classloader when a class can't be found in the
+   *     infrastructure classloader.
+   *   </li>
+   *   <li>
+   *     Application classloader only delegates to the API classloader for the common classes specified by
+   *     {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
+   *   </li>
+   * </ol>
+   */
+  public ClassLoader buildClassLoader() {
+    // start at the user.dir to find the resources for the classpaths
+    String baseDirectoryPath = System.getProperty("user.dir");
+    File apiLibDirectory = libDirectory(new File(baseDirectoryPath, DependencyIsolationUtils.FRAMEWORK_API_DIRECTORY));
+    LOG.info("Using API lib directory: {}", apiLibDirectory);
+    File infrastructureLibDirectory =
+        libDirectory(new File(baseDirectoryPath, DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY));
+    LOG.info("Using infrastructure lib directory: {}", infrastructureLibDirectory);
+    File applicationLibDirectory =
+        libDirectory(new File(baseDirectoryPath, DependencyIsolationUtils.APPLICATION_DIRECTORY));
+    LOG.info("Using application lib directory: {}", applicationLibDirectory);
+
+    ClassLoader apiClassLoader = buildApiClassLoader(apiLibDirectory);
+    ClassLoader applicationClassLoader =
+        buildApplicationClassLoader(applicationLibDirectory, apiLibDirectory, apiClassLoader);
+
+    // the classloader to return is the one with the infrastructure classpath
+    return buildInfrastructureClassLoader(infrastructureLibDirectory, apiLibDirectory, apiClassLoader,
+        applicationClassLoader);
+  }
+
+  /**
+   * Build the {@link ClassLoader} which can load framework API classes.
+   *
+   * This sets up the link between the bootstrap classloader and the API classloader (see {@link #buildClassLoader()}.
+   */
+  private static ClassLoader buildApiClassLoader(File apiLibDirectory) {
+    /*
+     * This can just use the built-in classloading, which checks the parent classloader first and then checks its own
+     * classpath. A null parent means bootstrap classloader, which contains core Java classes (e.g. java.lang.String).
+     * This doesn't need to be isolated from the parent, because we only want to load all bootstrap classes from the
+     * bootstrap classloader.
+     */
+    return new URLClassLoader(getClasspathAsURLs(apiLibDirectory), null);
+  }
+
+  /**
+   * Build the {@link ClassLoader} which can load application classes.
+   *
+   * This sets up the link between the application classloader and the API classloader (see {@link #buildClassLoader()}.
+   */
+  private static ClassLoader buildApplicationClassLoader(File applicationLibDirectory, File apiLibDirectory,
+      ClassLoader apiClassLoader) {
+    return LoaderBuilder.anIsolatingLoader()
+        // look in application lib directory for JARs
+        .withClasspath(getClasspathAsURIs(applicationLibDirectory))
+        // getClasspathAsURIs should only return JARs within applicationLibDirectory anyways, but doing it to be safe
+        .withOriginRestriction(OriginRestriction.denyByDefault().allowingDirectory(applicationLibDirectory, false))
+        // delegate to the api classloader for API classes
+        .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
+        .build();
+  }
+
+  /**
+   * Build the {@link ClassLoader} which can load Samza framework core classes.
+   * This may also fall back to loading application classes.
+   *
+   * This sets up two links: One link between the infrastructure classloader and the API and another link between the
+   * infrastructure classloader and the application classloader (see {@link #buildClassLoader()}.
+   */
+  private static ClassLoader buildInfrastructureClassLoader(File infrastructureLibDirectory, File apiLibDirectory,
+      ClassLoader apiClassLoader, ClassLoader applicationClassLoader) {
+    return LoaderBuilder.anIsolatingLoader()
+        // look in infrastructure lib directory for JARs
+        .withClasspath(getClasspathAsURIs(infrastructureLibDirectory))
+        // getClasspathAsURIs should only return JARs within infrastructureLibDirectory anyways, but doing it to be safe
+        .withOriginRestriction(OriginRestriction.denyByDefault().allowingDirectory(infrastructureLibDirectory, false))
+        .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
+        /*
+         * Fall back to the application classloader for certain classes. For example, the application might implement
+         * some pluggable classes (e.g. SystemFactory). Another example is message schemas that are supplied by the
+         * application.
+         */
+        .addFallbackDelegate(DelegateRelationshipBuilder.builder()
+            .withDelegateClassLoader(applicationClassLoader)
+            /*
+             * NONE means that a class will be loaded from here if it is not found in the classpath of the loader that uses
+             * this relationship.
+             */
+            .withIsolationLevel(IsolationLevel.NONE)
+            .build())
+        .build();
+  }
+
+  /**
+   * Build a {@link DelegateRelationship} which defines how to delegate to the API classloader.
+   *
+   * Delegation will only happen for classes specified in
+   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} and the Java bootstrap classes.
+   */
+  private static DelegateRelationship buildApiParentRelationship(File apiLibDirectory, ClassLoader apiClassLoader) {
+    DelegateRelationshipBuilder apiParentRelationshipBuilder = DelegateRelationshipBuilder.builder()
+        // needs to load API classes from the API classloader
+        .withDelegateClassLoader(apiClassLoader)
+        /*
+         * FULL means to only load classes explicitly specified as "API" from the API classloader. We will use
+         * delegate-preferred class predicates to specify which classes are "API" (see below).
+         */
+        .withIsolationLevel(IsolationLevel.FULL);
+
+    // bootstrap classes need to be loaded from a common classloader
+    apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new BootstrapClassPredicate());
+    // the classes which are Samza framework API classes are added here
+    getFrameworkApiClassGlobs(apiLibDirectory).forEach(
+        apiClassName -> apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new GlobMatcher(apiClassName)));
+    return apiParentRelationshipBuilder.build();
+  }
+
+  /**
+   * Gets the globs for matching against classes to load from the framework API classloader. This will read the
+   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file in {@code directoryWithClassList} to get
+   * the globs.
+   *
+   * @param directoryWithClassList Directory in which
+   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} lives
+   * @return {@link List} of globs for matching against classes to load from the framework API classloader
+   */
+  @VisibleForTesting
+  static List<String> getFrameworkApiClassGlobs(File directoryWithClassList) {
+    File parentPreferredFile =
+        new File(directoryWithClassList, DependencyIsolationUtils.FRAMEWORK_API_CLASS_LIST_FILE_NAME);
+    validateCanAccess(parentPreferredFile);
+    try {
+      return Files.readAllLines(Paths.get(parentPreferredFile.toURI()), StandardCharsets.UTF_8)
+          .stream()
+          .filter(StringUtils::isNotBlank)
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new SamzaException("Error while reading samza-api class list", e);
+    }
+  }
+
+  /**
+   * Get the {@link URL}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
+   * not recursive.
+   */
+  @VisibleForTesting
+  static URL[] getClasspathAsURLs(File jarsLocation) {
+    validateCanAccess(jarsLocation);
+    File[] filesInJarsLocation = jarsLocation.listFiles();
+    if (filesInJarsLocation == null) {
+      throw new SamzaException(
+          String.format("Could not find any files inside %s, probably because it is not a directory",
+              jarsLocation.getPath()));
+    }
+    URL[] urls = Stream.of(filesInJarsLocation)
+        .filter(file -> file.getName().endsWith(".jar") || file.getName().endsWith(".war"))
+        .map(IsolatingClassLoaderFactory::fileURL)
+        .toArray(URL[]::new);
+    LOG.info("Found {} items to load into classpath from {}", urls.length, jarsLocation);
+    Stream.of(urls).forEach(url -> LOG.debug("Found {} from {}", url, jarsLocation));
+    return urls;
+  }
+
+  /**
+   * Get the {@link URI}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
+   * not recursive.
+   */
+  @VisibleForTesting
+  static List<URI> getClasspathAsURIs(File jarsLocation) {
+    return Stream.of(getClasspathAsURLs(jarsLocation))
+        .map(IsolatingClassLoaderFactory::urlToURI)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Makes sure that a file exists and can be read.
+   */
+  private static void validateCanAccess(File file) {
+    if (!file.exists() || !file.canRead()) {
+      throw new SamzaException("Unable to access file: " + file);
+    }
+  }
+
+  /**
+   * Get the {@link URL} for a {@link File}.
+   * Converts checked exceptions into {@link SamzaException}s.
+   */
+  private static URL fileURL(File file) {
+    URI uri = file.toURI();
+    try {
+      return uri.toURL();
+    } catch (MalformedURLException e) {
+      throw new SamzaException("Unable to get URL for file: " + file, e);
+    }
+  }
+
+  /**
+   * Get the {@link URI} for a {@link URL}.
+   * Converts checked exceptions into {@link SamzaException}s.
+   */
+  private static URI urlToURI(URL url) {
+    try {
+      return url.toURI();
+    } catch (URISyntaxException e) {
+      throw new SamzaException("Unable to get URI for URL: " + url, e);
+    }
+  }
+
+  /**
+   * Get the {@link File} representing the {@link #LIB_DIRECTORY} inside the given {@code file}.
+   */
+  private static File libDirectory(File file) {
+    return new File(file, LIB_DIRECTORY);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index a308c19..d9ca4d5 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -20,6 +20,8 @@ package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -27,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 import org.apache.samza.SamzaException;
+import org.apache.samza.classloader.IsolatingClassLoaderFactory;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -403,10 +406,80 @@ public class ClusterBasedJobCoordinator {
   }
 
   /**
-   * The entry point for the {@link ClusterBasedJobCoordinator}
-   * @param args args
+   * The entry point for the {@link ClusterBasedJobCoordinator}.
    */
   public static void main(String[] args) {
+    boolean dependencyIsolationEnabled = Boolean.parseBoolean(
+        System.getenv(ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED()));
+    if (!dependencyIsolationEnabled) {
+      // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
+      runClusterBasedJobCoordinator(args);
+    } else {
+      runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(), args);
+    }
+  }
+
+  /**
+   * Execute the coordinator using a separate isolated classloader.
+   * @param classLoader {@link ClassLoader} to use to load the {@link ClusterBasedJobCoordinator} which will run
+   * @param args arguments to pass when running the {@link ClusterBasedJobCoordinator}
+   */
+  @VisibleForTesting
+  static void runWithClassLoader(ClassLoader classLoader, String[] args) {
+    // need to use the isolated classloader to load ClusterBasedJobCoordinator and then run using that new class
+    Class<?> clusterBasedJobCoordinatorClass;
+    try {
+      clusterBasedJobCoordinatorClass = classLoader.loadClass(ClusterBasedJobCoordinator.class.getName());
+    } catch (ClassNotFoundException e) {
+      throw new SamzaException(
+          "Isolation was enabled, but unable to find ClusterBasedJobCoordinator in isolated classloader", e);
+    }
+
+    // save the current context classloader so it can be reset after finishing the call to runClusterBasedJobCoordinator
+    ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
+    // this is needed because certain libraries (e.g. log4j) use the context classloader
+    Thread.currentThread().setContextClassLoader(classLoader);
+
+    try {
+      executeRunClusterBasedJobCoordinatorForClass(clusterBasedJobCoordinatorClass, args);
+    } finally {
+      // reset the context class loader; it's good practice, and could be important when running a test suite
+      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
+    }
+  }
+
+  /**
+   * Runs the {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])} method of the given
+   * {@code clusterBasedJobCoordinatorClass} using reflection.
+   * @param clusterBasedJobCoordinatorClass {@link ClusterBasedJobCoordinator} {@link Class} for which to execute
+   * {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])}
+   * @param args arguments to pass to {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])}
+   */
+  private static void executeRunClusterBasedJobCoordinatorForClass(Class<?> clusterBasedJobCoordinatorClass,
+      String[] args) {
+    Method runClusterBasedJobCoordinatorMethod;
+    try {
+      runClusterBasedJobCoordinatorMethod =
+          clusterBasedJobCoordinatorClass.getDeclaredMethod("runClusterBasedJobCoordinator", String[].class);
+    } catch (NoSuchMethodException e) {
+      throw new SamzaException("Isolation was enabled, but unable to find runClusterBasedJobCoordinator method", e);
+    }
+    // only sets accessible flag for this Method instance, not other Method instances for runClusterBasedJobCoordinator
+    runClusterBasedJobCoordinatorMethod.setAccessible(true);
+
+    try {
+      // wrapping args in object array so that args is passed as a single argument to the method
+      runClusterBasedJobCoordinatorMethod.invoke(null, new Object[]{args});
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      throw new SamzaException("Exception while executing runClusterBasedJobCoordinator method", e);
+    }
+  }
+
+  /**
+   * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
+   * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
+   */
+  private static void runClusterBasedJobCoordinator(String[] args) {
     Config coordinatorSystemConfig;
     final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
     try {
diff --git a/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java b/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
new file mode 100644
index 0000000..7444fbf
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.classloader;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Set;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.SamzaException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestIsolatingClassLoaderFactory {
+  @Test
+  public void testGetApiClasses() throws URISyntaxException {
+    File apiClassListFile = Paths.get(getClass().getResource("/classloader").toURI()).toFile();
+    List<String> apiClassNames = IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(apiClassListFile);
+    List<String> expected = ImmutableList.of(
+        "org.apache.samza.JavaClass",
+        "org.apache.samza.JavaClass$InnerJavaClass",
+        "org.apache.samza.ScalaClass$",
+        "org.apache.samza.ScalaClass$$anon$1",
+        "my.package.with.wildcard.*",
+        "my.package.with.question.mark?");
+    assertEquals(expected, apiClassNames);
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testGetApiClassesFileDoesNotExist() throws URISyntaxException {
+    File nonExistentDirectory =
+        new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
+    IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(nonExistentDirectory);
+  }
+
+  @Test
+  public void testGetClasspathAsURLs() throws URISyntaxException {
+    File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
+    URL[] classpath = IsolatingClassLoaderFactory.getClasspathAsURLs(classpathDirectory);
+    assertEquals(2, classpath.length);
+    Set<URL> classpathSet = ImmutableSet.copyOf(classpath);
+    URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
+    assertTrue(classpathSet.contains(jarUrl));
+    URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
+    assertTrue(classpathSet.contains(warUrl));
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testGetClasspathAsURLsDirectoryDoesNotExist() throws URISyntaxException {
+    File nonExistentDirectory =
+        new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
+    IsolatingClassLoaderFactory.getClasspathAsURLs(nonExistentDirectory);
+  }
+
+  @Test
+  public void testGetClasspathAsURIs() throws URISyntaxException {
+    File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
+    List<URI> classpath = IsolatingClassLoaderFactory.getClasspathAsURIs(classpathDirectory);
+    assertEquals(2, classpath.size());
+    Set<URI> classpathSet = ImmutableSet.copyOf(classpath);
+    URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
+    assertTrue(classpathSet.contains(jarUrl.toURI()));
+    URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
+    assertTrue(classpathSet.contains(warUrl.toURI()));
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testGetClasspathAsURIsDirectoryDoesNotExist() throws URISyntaxException {
+    File nonExistentDirectory =
+        new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
+    IsolatingClassLoaderFactory.getClasspathAsURIs(nonExistentDirectory);
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index ec79f3c..8c51c64 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -47,21 +47,25 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.AdditionalMatchers.aryEq;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.verifyPrivate;
+
 
 /**
  * Tests for {@link ClusterBasedJobCoordinator}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinatorStreamUtil.class)
+@PrepareForTest({CoordinatorStreamUtil.class, ClusterBasedJobCoordinator.class})
 public class TestClusterBasedJobCoordinator {
 
   Map<String, String> configMap;
@@ -162,4 +166,36 @@ public class TestClusterBasedJobCoordinator {
     }
     fail("Expected run() method to stop after StartpointManager#stop()");
   }
+
+  @Test
+  public void testRunWithClassLoader() throws Exception {
+    // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
+    PowerMockito.spy(ClusterBasedJobCoordinator.class);
+    // save the context classloader to make sure that it gets set properly once the test is finished
+    ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
+    ClassLoader classLoader = mock(ClassLoader.class);
+    String[] args = new String[]{"arg0", "arg1"};
+    doReturn(ClusterBasedJobCoordinator.class).when(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+
+    // stub the private static method which is called by reflection
+    PowerMockito.doAnswer(invocation -> {
+        // make sure the only calls to this method has the expected arguments
+        assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
+        // checks that the context classloader is set correctly
+        assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
+        return null;
+      }).when(ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", any());
+
+    try {
+      ClusterBasedJobCoordinator.runWithClassLoader(classLoader, args);
+      assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
+    } finally {
+      // reset it explicitly just in case runWithClassLoader throws an exception
+      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
+    }
+    // make sure that the classloader got used
+    verify(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+    // make sure runClusterBasedJobCoordinator only got called once
+    verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
+  }
 }
diff --git a/samza-core/src/test/resources/classloader/classpath/placeholder-jar.jar b/samza-core/src/test/resources/classloader/classpath/placeholder-jar.jar
new file mode 100644
index 0000000..d9b2beb
--- /dev/null
+++ b/samza-core/src/test/resources/classloader/classpath/placeholder-jar.jar
@@ -0,0 +1 @@
+NOT AN ACTUAL JAR; JUST A PLACEHOLDER FOR A TEST
diff --git a/samza-core/src/test/resources/classloader/classpath/placeholder-txt.json b/samza-core/src/test/resources/classloader/classpath/placeholder-txt.json
new file mode 100644
index 0000000..c4336ea
--- /dev/null
+++ b/samza-core/src/test/resources/classloader/classpath/placeholder-txt.json
@@ -0,0 +1 @@
+{"key": "This is a placeholder file which should be ignored in testing, since it is not a jar/war."}
\ No newline at end of file
diff --git a/samza-core/src/test/resources/classloader/classpath/placeholder-war.war b/samza-core/src/test/resources/classloader/classpath/placeholder-war.war
new file mode 100644
index 0000000..9fa2312
--- /dev/null
+++ b/samza-core/src/test/resources/classloader/classpath/placeholder-war.war
@@ -0,0 +1 @@
+NOT AN ACTUAL WAR; JUST A PLACEHOLDER FOR A TEST
diff --git a/samza-core/src/test/resources/classloader/samza-framework-api-classes.txt b/samza-core/src/test/resources/classloader/samza-framework-api-classes.txt
new file mode 100644
index 0000000..8b081d7
--- /dev/null
+++ b/samza-core/src/test/resources/classloader/samza-framework-api-classes.txt
@@ -0,0 +1,6 @@
+org.apache.samza.JavaClass
+org.apache.samza.JavaClass$InnerJavaClass
+org.apache.samza.ScalaClass$
+org.apache.samza.ScalaClass$$anon$1
+my.package.with.wildcard.*
+my.package.with.question.mark?
\ No newline at end of file
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index 2b7d22e..8cdac30 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -39,10 +39,18 @@ fi
 HADOOP_YARN_HOME="${HADOOP_YARN_HOME:-$HOME/.samza}"
 HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$HADOOP_YARN_HOME/conf}"
 GC_LOG_ROTATION_OPTS="-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024"
-DEFAULT_LOG4J_FILE=$base_dir/lib/log4j.xml
-DEFAULT_LOG4J2_FILE=$base_dir/lib/log4j2.xml
+LOG4J_FILE_NAME="log4j.xml"
+LOG4J2_FILE_NAME="log4j2.xml"
 BASE_LIB_DIR="$base_dir/lib"
+DEFAULT_LOG4J_FILE=$BASE_LIB_DIR/$LOG4J_FILE_NAME
+DEFAULT_LOG4J2_FILE=$BASE_LIB_DIR/$LOG4J2_FILE_NAME
 
+# APPLICATION_LIB_DIR can be a directory which is different from $BASE_LIB_DIR which contains some additional
+# application-specific resources. If it is not set, then $BASE_LIB_DIR will be used as the value.
+APPLICATION_LIB_DIR="${APPLICATION_LIB_DIR:-$BASE_LIB_DIR}"
+export APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
+
+echo APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
 echo BASE_LIB_DIR=$BASE_LIB_DIR
 
 CLASSPATH=""
@@ -100,11 +108,28 @@ function check_and_enable_64_bit_mode {
 # Make the MDC inheritable to child threads by setting the system property to true if config not explicitly specified
 [[ $JAVA_OPTS != *-DisThreadContextMapInheritable* ]] && JAVA_OPTS="$JAVA_OPTS -DisThreadContextMapInheritable=true"
 
-# Check if log4j configuration is specified. If not - set to lib/log4j.xml
-if [[ -n $(find "$base_dir/lib" -regex ".*samza-log4j2.*.jar*") ]]; then
-    [[ $JAVA_OPTS != *-Dlog4j.configurationFile* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:$DEFAULT_LOG4J2_FILE"
-elif [[ -n $(find "$base_dir/lib" -regex ".*samza-log4j.*.jar*") ]]; then
-    [[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$DEFAULT_LOG4J_FILE"
+# Check if log4j configuration is specified; if not, look for a configuration file:
+# 1) Check if using log4j or log4j2
+# 2) Check if configuration file system property is already set
+# 3) If not, then look in $APPLICATION_LIB_DIR for configuration file (remember that $APPLICATION_LIB_DIR can be same or
+#    different from $BASE_LIB_DIR).
+# 4) If still can't find it, fall back to default (from $BASE_LIB_DIR).
+if [[ -n $(find "$BASE_LIB_DIR" -regex ".*samza-log4j2.*.jar*") ]]; then
+  if [[ $JAVA_OPTS != *-Dlog4j.configurationFile* ]]; then
+    if [[ -n $(find "$APPLICATION_LIB_DIR" -maxdepth 1 -name $LOG4J2_FILE_NAME) ]]; then
+      export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:$APPLICATION_LIB_DIR/$LOG4J2_FILE_NAME"
+    else
+      export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:$DEFAULT_LOG4J2_FILE"
+    fi
+  fi
+elif [[ -n $(find "$BASE_LIB_DIR" -regex ".*samza-log4j.*.jar*") ]]; then
+  if [[ $JAVA_OPTS != *-Dlog4j.configuration* ]]; then
+    if [[ -n $(find "$APPLICATION_LIB_DIR" -maxdepth 1 -name $LOG4J_FILE_NAME) ]]; then
+      export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$APPLICATION_LIB_DIR/$LOG4J_FILE_NAME"
+    else
+      export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$DEFAULT_LOG4J_FILE"
+    fi
+  fi
 fi
 
 # Check if samza.log.dir is specified. If not - set to environment variable if it is set