You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2019/11/26 19:13:35 UTC
[samza] branch master updated: SAMZA-2333: [AM isolation] Use
cytodynamics classloader to launch job coordinator (#1173)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 3832bf8 SAMZA-2333: [AM isolation] Use cytodynamics classloader to launch job coordinator (#1173)
3832bf8 is described below
commit 3832bf86b1a9d6f1c171bab3dff80b3ac91f59a6
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Tue Nov 26 11:13:28 2019 -0800
SAMZA-2333: [AM isolation] Use cytodynamics classloader to launch job coordinator (#1173)
---
build.gradle | 2 +
gradle/dependency-versions.gradle | 3 +-
.../classloader/DependencyIsolationUtils.java | 8 +
.../classloader/IsolatingClassLoaderFactory.java | 327 +++++++++++++++++++++
.../clustermanager/ClusterBasedJobCoordinator.java | 77 ++++-
.../TestIsolatingClassLoaderFactory.java | 96 ++++++
.../TestClusterBasedJobCoordinator.java | 40 ++-
.../classloader/classpath/placeholder-jar.jar | 1 +
.../classloader/classpath/placeholder-txt.json | 1 +
.../classloader/classpath/placeholder-war.war | 1 +
.../classloader/samza-framework-api-classes.txt | 6 +
samza-shell/src/main/bash/run-class.sh | 39 ++-
12 files changed, 589 insertions(+), 12 deletions(-)
diff --git a/build.gradle b/build.gradle
index c4ca349..dbf777e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -89,6 +89,7 @@ rat {
'samza-test/state/**',
'README.md',
'RELEASE.md',
+ 'samza-core/src/test/resources/classloader/samza-framework-api-classes.txt',
'samza-test/src/main/resources/**',
'samza-hdfs/src/main/resources/**',
'samza-hdfs/src/test/resources/**',
@@ -190,6 +191,7 @@ project(":samza-core_$scalaSuffix") {
compile "org.scala-lang:scala-library:$scalaVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
compile "net.jodah:failsafe:$failsafeVersion"
+ compile "com.linkedin.cytodynamics:cytodynamics-nucleus:$cytodynamicsVersion"
testCompile project(":samza-api").sourceSets.test.output
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index f70e879..fcf472e 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -24,8 +24,9 @@
commonsCodecVersion = "1.9"
commonsCollectionVersion = "3.2.1"
commonsHttpClientVersion = "3.1"
- commonsLang3Version = "3.4"
commonsIoVersion = "2.6"
+ commonsLang3Version = "3.4"
+ cytodynamicsVersion = "0.2.0"
elasticsearchVersion = "2.2.0"
gsonVersion = "2.8.5"
guavaVersion = "23.0"
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java b/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
index 0d44659..8f933ea 100644
--- a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
@@ -25,16 +25,24 @@ public class DependencyIsolationUtils {
* TODO make this configurable or taken from an environment variable
*/
public static final String FRAMEWORK_API_DIRECTORY = "__samzaFrameworkApi";
+
/**
* Directory inside the home directory of the cluster-based job coordinator in which the framework infrastructure
* artifacts are placed, for usage in dependency isolation for the cluster-based job coordinator.
* TODO make this configurable or taken from an environment variable
*/
public static final String FRAMEWORK_INFRASTRUCTURE_DIRECTORY = "__samzaFrameworkInfrastructure";
+
/**
* Directory inside the home directory of the cluster-based job coordinator in which the application artifacts are
* placed, for usage in dependency isolation for the cluster-based job coordinator.
* TODO make this configurable or taken from an environment variable
*/
public static final String APPLICATION_DIRECTORY = "__package";
+
+ /**
+ * Name of the file which contains the class names (or globs) which should be loaded from the framework API
+ * classloader.
+ */
+ public static final String FRAMEWORK_API_CLASS_LIST_FILE_NAME = "samza-framework-api-classes.txt";
}
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java b/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
new file mode 100644
index 0000000..47d1ea0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.classloader;
+
+import com.linkedin.cytodynamics.matcher.BootstrapClassPredicate;
+import com.linkedin.cytodynamics.matcher.GlobMatcher;
+import com.linkedin.cytodynamics.nucleus.DelegateRelationship;
+import com.linkedin.cytodynamics.nucleus.DelegateRelationshipBuilder;
+import com.linkedin.cytodynamics.nucleus.IsolationLevel;
+import com.linkedin.cytodynamics.nucleus.LoaderBuilder;
+import com.linkedin.cytodynamics.nucleus.OriginRestriction;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Use this to build a classloader for running Samza which isolates the Samza framework code/dependencies from the
+ * application code/dependencies.
+ */
+public class IsolatingClassLoaderFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(IsolatingClassLoaderFactory.class);
+
+ private static final String LIB_DIRECTORY = "lib";
+
+ /**
+ * Build a classloader which will isolate Samza framework code from application code. Samza framework classes and
+ * application-specific classes will be loaded using a different classloaders. This will enable dependencies of each
+ * category of classes to also be loaded separately, so that runtime dependency conflicts do not happen.
+ * Each call to this method will build a different instance of a classloader.
+ *
+ * Samza framework API classes need to be specified in a file called
+ * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} which is in the lib directory which is in the
+ * API package. The file needs to be generated when building the framework API package. This class will not generate
+ * the file.
+ *
+ * Implementation notes:
+ *
+ * The cytodynamics isolating classloader is used for this. It provides more control than the built-in
+ * {@link URLClassLoader}. Cytodynamics provides the ability to compose multiple classloaders together and have more
+ * granular delegation strategies between the classloaders.
+ *
+ * In order to share objects between classes loaded by different classloaders, the classes for the shared objects must
+ * be loaded by a common classloader. Those common classes will be loaded through a common API classloader. The
+ * cytodynamics classloader can be set up to only use the common API classloader for an explicit set of classes. The
+ * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file should include the framework API classes.
+ * Also, bootstrap classes (e.g. java.lang.String) need to be loaded by a common classloader, since objects of those
+ * types need to be shared across different framework and application. There are also some static bootstrap classes
+ * which should be shared (e.g. java.lang.System). Bootstrap classes will be loaded through a common classloader by
+ * default.
+ *
+ * These are the classloaders which are used to make up the final classloader.
+ * <ul>
+ * <li>bootstrap classloader: Built-in Java classes (e.g. java.lang.String)</li>
+ * <li>API classloader: Common Samza framework API classes</li>
+ * <li>infrastructure classloader: Core Samza framework classes and plugins that are included in the framework</li>
+ * <li>
+ * application classloader: Application code and plugins that are needed in the app but are not included in the
+ * framework
+ * </li>
+ * </ul>
+ *
+ * This is the delegation structure for the classloaders:
+ * <pre>
+ * (bootstrap (API (application
+ * classloader) <---- classloader) <------- classloader)
+ * ^ ^
+ * | /
+ * | /
+ * | /
+ * | /
+ * (infrastructure classloader)
+ * </pre>
+ * The cytodynamics classloader allows control over when the delegation should happen.
+ * <ol>
+ * <li>API classloader delegates to the bootstrap classloader if the bootstrap classloader has the class.</li>
+ * <li>
+ * Infrastructure classloader only delegates to the API classloader for the common classes specified by
+ * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
+ * </li>
+ * <li>
+ * Infrastructure classloader delegates to the application classloader when a class can't be found in the
+ * infrastructure classloader.
+ * </li>
+ * <li>
+ * Application classloader only delegates to the API classloader for the common classes specified by
+ * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
+ * </li>
+ * </ol>
+ */
+ public ClassLoader buildClassLoader() {
+ // start at the user.dir to find the resources for the classpaths
+ String baseDirectoryPath = System.getProperty("user.dir");
+ File apiLibDirectory = libDirectory(new File(baseDirectoryPath, DependencyIsolationUtils.FRAMEWORK_API_DIRECTORY));
+ LOG.info("Using API lib directory: {}", apiLibDirectory);
+ File infrastructureLibDirectory =
+ libDirectory(new File(baseDirectoryPath, DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY));
+ LOG.info("Using infrastructure lib directory: {}", infrastructureLibDirectory);
+ File applicationLibDirectory =
+ libDirectory(new File(baseDirectoryPath, DependencyIsolationUtils.APPLICATION_DIRECTORY));
+ LOG.info("Using application lib directory: {}", applicationLibDirectory);
+
+ ClassLoader apiClassLoader = buildApiClassLoader(apiLibDirectory);
+ ClassLoader applicationClassLoader =
+ buildApplicationClassLoader(applicationLibDirectory, apiLibDirectory, apiClassLoader);
+
+ // the classloader to return is the one with the infrastructure classpath
+ return buildInfrastructureClassLoader(infrastructureLibDirectory, apiLibDirectory, apiClassLoader,
+ applicationClassLoader);
+ }
+
+ /**
+ * Build the {@link ClassLoader} which can load framework API classes.
+ *
+ * This sets up the link between the bootstrap classloader and the API classloader (see {@link #buildClassLoader()}.
+ */
+ private static ClassLoader buildApiClassLoader(File apiLibDirectory) {
+ /*
+ * This can just use the built-in classloading, which checks the parent classloader first and then checks its own
+ * classpath. A null parent means bootstrap classloader, which contains core Java classes (e.g. java.lang.String).
+ * This doesn't need to be isolated from the parent, because we only want to load all bootstrap classes from the
+ * bootstrap classloader.
+ */
+ return new URLClassLoader(getClasspathAsURLs(apiLibDirectory), null);
+ }
+
+ /**
+ * Build the {@link ClassLoader} which can load application classes.
+ *
+ * This sets up the link between the application classloader and the API classloader (see {@link #buildClassLoader()}.
+ */
+ private static ClassLoader buildApplicationClassLoader(File applicationLibDirectory, File apiLibDirectory,
+ ClassLoader apiClassLoader) {
+ return LoaderBuilder.anIsolatingLoader()
+ // look in application lib directory for JARs
+ .withClasspath(getClasspathAsURIs(applicationLibDirectory))
+ // getClasspathAsURIs should only return JARs within applicationLibDirectory anyways, but doing it to be safe
+ .withOriginRestriction(OriginRestriction.denyByDefault().allowingDirectory(applicationLibDirectory, false))
+ // delegate to the api classloader for API classes
+ .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
+ .build();
+ }
+
+ /**
+ * Build the {@link ClassLoader} which can load Samza framework core classes.
+ * This may also fall back to loading application classes.
+ *
+ * This sets up two links: One link between the infrastructure classloader and the API and another link between the
+ * infrastructure classloader and the application classloader (see {@link #buildClassLoader()}.
+ */
+ private static ClassLoader buildInfrastructureClassLoader(File infrastructureLibDirectory, File apiLibDirectory,
+ ClassLoader apiClassLoader, ClassLoader applicationClassLoader) {
+ return LoaderBuilder.anIsolatingLoader()
+ // look in infrastructure lib directory for JARs
+ .withClasspath(getClasspathAsURIs(infrastructureLibDirectory))
+ // getClasspathAsURIs should only return JARs within infrastructureLibDirectory anyways, but doing it to be safe
+ .withOriginRestriction(OriginRestriction.denyByDefault().allowingDirectory(infrastructureLibDirectory, false))
+ .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
+ /*
+ * Fall back to the application classloader for certain classes. For example, the application might implement
+ * some pluggable classes (e.g. SystemFactory). Another example is message schemas that are supplied by the
+ * application.
+ */
+ .addFallbackDelegate(DelegateRelationshipBuilder.builder()
+ .withDelegateClassLoader(applicationClassLoader)
+ /*
+ * NONE means that a class will be loaded from here if it is not found in the classpath of the loader that uses
+ * this relationship.
+ */
+ .withIsolationLevel(IsolationLevel.NONE)
+ .build())
+ .build();
+ }
+
+ /**
+ * Build a {@link DelegateRelationship} which defines how to delegate to the API classloader.
+ *
+ * Delegation will only happen for classes specified in
+ * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} and the Java bootstrap classes.
+ */
+ private static DelegateRelationship buildApiParentRelationship(File apiLibDirectory, ClassLoader apiClassLoader) {
+ DelegateRelationshipBuilder apiParentRelationshipBuilder = DelegateRelationshipBuilder.builder()
+ // needs to load API classes from the API classloader
+ .withDelegateClassLoader(apiClassLoader)
+ /*
+ * FULL means to only load classes explicitly specified as "API" from the API classloader. We will use
+ * delegate-preferred class predicates to specify which classes are "API" (see below).
+ */
+ .withIsolationLevel(IsolationLevel.FULL);
+
+ // bootstrap classes need to be loaded from a common classloader
+ apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new BootstrapClassPredicate());
+ // the classes which are Samza framework API classes are added here
+ getFrameworkApiClassGlobs(apiLibDirectory).forEach(
+ apiClassName -> apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new GlobMatcher(apiClassName)));
+ return apiParentRelationshipBuilder.build();
+ }
+
+ /**
+ * Gets the globs for matching against classes to load from the framework API classloader. This will read the
+ * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file in {@code directoryWithClassList} to get
+ * the globs.
+ *
+ * @param directoryWithClassList Directory in which
+ * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} lives
+ * @return {@link List} of globs for matching against classes to load from the framework API classloader
+ */
+ @VisibleForTesting
+ static List<String> getFrameworkApiClassGlobs(File directoryWithClassList) {
+ File parentPreferredFile =
+ new File(directoryWithClassList, DependencyIsolationUtils.FRAMEWORK_API_CLASS_LIST_FILE_NAME);
+ validateCanAccess(parentPreferredFile);
+ try {
+ return Files.readAllLines(Paths.get(parentPreferredFile.toURI()), StandardCharsets.UTF_8)
+ .stream()
+ .filter(StringUtils::isNotBlank)
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new SamzaException("Error while reading samza-api class list", e);
+ }
+ }
+
+ /**
+ * Get the {@link URL}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
+ * not recursive.
+ */
+ @VisibleForTesting
+ static URL[] getClasspathAsURLs(File jarsLocation) {
+ validateCanAccess(jarsLocation);
+ File[] filesInJarsLocation = jarsLocation.listFiles();
+ if (filesInJarsLocation == null) {
+ throw new SamzaException(
+ String.format("Could not find any files inside %s, probably because it is not a directory",
+ jarsLocation.getPath()));
+ }
+ URL[] urls = Stream.of(filesInJarsLocation)
+ .filter(file -> file.getName().endsWith(".jar") || file.getName().endsWith(".war"))
+ .map(IsolatingClassLoaderFactory::fileURL)
+ .toArray(URL[]::new);
+ LOG.info("Found {} items to load into classpath from {}", urls.length, jarsLocation);
+ Stream.of(urls).forEach(url -> LOG.debug("Found {} from {}", url, jarsLocation));
+ return urls;
+ }
+
+ /**
+ * Get the {@link URI}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
+ * not recursive.
+ */
+ @VisibleForTesting
+ static List<URI> getClasspathAsURIs(File jarsLocation) {
+ return Stream.of(getClasspathAsURLs(jarsLocation))
+ .map(IsolatingClassLoaderFactory::urlToURI)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Makes sure that a file exists and can be read.
+ */
+ private static void validateCanAccess(File file) {
+ if (!file.exists() || !file.canRead()) {
+ throw new SamzaException("Unable to access file: " + file);
+ }
+ }
+
+ /**
+ * Get the {@link URL} for a {@link File}.
+ * Converts checked exceptions into {@link SamzaException}s.
+ */
+ private static URL fileURL(File file) {
+ URI uri = file.toURI();
+ try {
+ return uri.toURL();
+ } catch (MalformedURLException e) {
+ throw new SamzaException("Unable to get URL for file: " + file, e);
+ }
+ }
+
+ /**
+ * Get the {@link URI} for a {@link URL}.
+ * Converts checked exceptions into {@link SamzaException}s.
+ */
+ private static URI urlToURI(URL url) {
+ try {
+ return url.toURI();
+ } catch (URISyntaxException e) {
+ throw new SamzaException("Unable to get URI for URL: " + url, e);
+ }
+ }
+
+ /**
+ * Get the {@link File} representing the {@link #LIB_DIRECTORY} inside the given {@code file}.
+ */
+ private static File libDirectory(File file) {
+ return new File(file, LIB_DIRECTORY);
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index a308c19..d9ca4d5 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -20,6 +20,8 @@ package org.apache.samza.clustermanager;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -27,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.samza.SamzaException;
+import org.apache.samza.classloader.IsolatingClassLoaderFactory;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
@@ -403,10 +406,80 @@ public class ClusterBasedJobCoordinator {
}
/**
- * The entry point for the {@link ClusterBasedJobCoordinator}
- * @param args args
+ * The entry point for the {@link ClusterBasedJobCoordinator}.
*/
public static void main(String[] args) {
+ boolean dependencyIsolationEnabled = Boolean.parseBoolean(
+ System.getenv(ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED()));
+ if (!dependencyIsolationEnabled) {
+ // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
+ runClusterBasedJobCoordinator(args);
+ } else {
+ runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(), args);
+ }
+ }
+
+ /**
+ * Execute the coordinator using a separate isolated classloader.
+ * @param classLoader {@link ClassLoader} to use to load the {@link ClusterBasedJobCoordinator} which will run
+ * @param args arguments to pass when running the {@link ClusterBasedJobCoordinator}
+ */
+ @VisibleForTesting
+ static void runWithClassLoader(ClassLoader classLoader, String[] args) {
+ // need to use the isolated classloader to load ClusterBasedJobCoordinator and then run using that new class
+ Class<?> clusterBasedJobCoordinatorClass;
+ try {
+ clusterBasedJobCoordinatorClass = classLoader.loadClass(ClusterBasedJobCoordinator.class.getName());
+ } catch (ClassNotFoundException e) {
+ throw new SamzaException(
+ "Isolation was enabled, but unable to find ClusterBasedJobCoordinator in isolated classloader", e);
+ }
+
+ // save the current context classloader so it can be reset after finishing the call to runClusterBasedJobCoordinator
+ ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
+ // this is needed because certain libraries (e.g. log4j) use the context classloader
+ Thread.currentThread().setContextClassLoader(classLoader);
+
+ try {
+ executeRunClusterBasedJobCoordinatorForClass(clusterBasedJobCoordinatorClass, args);
+ } finally {
+ // reset the context class loader; it's good practice, and could be important when running a test suite
+ Thread.currentThread().setContextClassLoader(previousContextClassLoader);
+ }
+ }
+
+ /**
+ * Runs the {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])} method of the given
+ * {@code clusterBasedJobCoordinatorClass} using reflection.
+ * @param clusterBasedJobCoordinatorClass {@link ClusterBasedJobCoordinator} {@link Class} for which to execute
+ * {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])}
+ * @param args arguments to pass to {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])}
+ */
+ private static void executeRunClusterBasedJobCoordinatorForClass(Class<?> clusterBasedJobCoordinatorClass,
+ String[] args) {
+ Method runClusterBasedJobCoordinatorMethod;
+ try {
+ runClusterBasedJobCoordinatorMethod =
+ clusterBasedJobCoordinatorClass.getDeclaredMethod("runClusterBasedJobCoordinator", String[].class);
+ } catch (NoSuchMethodException e) {
+ throw new SamzaException("Isolation was enabled, but unable to find runClusterBasedJobCoordinator method", e);
+ }
+ // only sets accessible flag for this Method instance, not other Method instances for runClusterBasedJobCoordinator
+ runClusterBasedJobCoordinatorMethod.setAccessible(true);
+
+ try {
+ // wrapping args in object array so that args is passed as a single argument to the method
+ runClusterBasedJobCoordinatorMethod.invoke(null, new Object[]{args});
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new SamzaException("Exception while executing runClusterBasedJobCoordinator method", e);
+ }
+ }
+
+ /**
+ * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
+ * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
+ */
+ private static void runClusterBasedJobCoordinator(String[] args) {
Config coordinatorSystemConfig;
final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
try {
diff --git a/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java b/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
new file mode 100644
index 0000000..7444fbf
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.classloader;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Set;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.SamzaException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestIsolatingClassLoaderFactory {
+ @Test
+ public void testGetApiClasses() throws URISyntaxException {
+ File apiClassListFile = Paths.get(getClass().getResource("/classloader").toURI()).toFile();
+ List<String> apiClassNames = IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(apiClassListFile);
+ List<String> expected = ImmutableList.of(
+ "org.apache.samza.JavaClass",
+ "org.apache.samza.JavaClass$InnerJavaClass",
+ "org.apache.samza.ScalaClass$",
+ "org.apache.samza.ScalaClass$$anon$1",
+ "my.package.with.wildcard.*",
+ "my.package.with.question.mark?");
+ assertEquals(expected, apiClassNames);
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testGetApiClassesFileDoesNotExist() throws URISyntaxException {
+ File nonExistentDirectory =
+ new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
+ IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(nonExistentDirectory);
+ }
+
+ @Test
+ public void testGetClasspathAsURLs() throws URISyntaxException {
+ File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
+ URL[] classpath = IsolatingClassLoaderFactory.getClasspathAsURLs(classpathDirectory);
+ assertEquals(2, classpath.length);
+ Set<URL> classpathSet = ImmutableSet.copyOf(classpath);
+ URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
+ assertTrue(classpathSet.contains(jarUrl));
+ URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
+ assertTrue(classpathSet.contains(warUrl));
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testGetClasspathAsURLsDirectoryDoesNotExist() throws URISyntaxException {
+ File nonExistentDirectory =
+ new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
+ IsolatingClassLoaderFactory.getClasspathAsURLs(nonExistentDirectory);
+ }
+
+ @Test
+ public void testGetClasspathAsURIs() throws URISyntaxException {
+ File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
+ List<URI> classpath = IsolatingClassLoaderFactory.getClasspathAsURIs(classpathDirectory);
+ assertEquals(2, classpath.size());
+ Set<URI> classpathSet = ImmutableSet.copyOf(classpath);
+ URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
+ assertTrue(classpathSet.contains(jarUrl.toURI()));
+ URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
+ assertTrue(classpathSet.contains(warUrl.toURI()));
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testGetClasspathAsURIsDirectoryDoesNotExist() throws URISyntaxException {
+ File nonExistentDirectory =
+ new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
+ IsolatingClassLoaderFactory.getClasspathAsURIs(nonExistentDirectory);
+ }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index ec79f3c..8c51c64 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -47,21 +47,25 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.verifyPrivate;
+
/**
* Tests for {@link ClusterBasedJobCoordinator}
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinatorStreamUtil.class)
+@PrepareForTest({CoordinatorStreamUtil.class, ClusterBasedJobCoordinator.class})
public class TestClusterBasedJobCoordinator {
Map<String, String> configMap;
@@ -162,4 +166,36 @@ public class TestClusterBasedJobCoordinator {
}
fail("Expected run() method to stop after StartpointManager#stop()");
}
+
+ @Test
+ public void testRunWithClassLoader() throws Exception {
+ // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
+ PowerMockito.spy(ClusterBasedJobCoordinator.class);
+ // save the context classloader to make sure that it gets set properly once the test is finished
+ ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
+ ClassLoader classLoader = mock(ClassLoader.class);
+ String[] args = new String[]{"arg0", "arg1"};
+ doReturn(ClusterBasedJobCoordinator.class).when(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+
+ // stub the private static method which is called by reflection
+ PowerMockito.doAnswer(invocation -> {
+ // make sure the only calls to this method has the expected arguments
+ assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
+ // checks that the context classloader is set correctly
+ assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
+ return null;
+ }).when(ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", any());
+
+ try {
+ ClusterBasedJobCoordinator.runWithClassLoader(classLoader, args);
+ assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
+ } finally {
+ // reset it explicitly just in case runWithClassLoader throws an exception
+ Thread.currentThread().setContextClassLoader(previousContextClassLoader);
+ }
+ // make sure that the classloader got used
+ verify(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+ // make sure runClusterBasedJobCoordinator only got called once
+ verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
+ }
}
diff --git a/samza-core/src/test/resources/classloader/classpath/placeholder-jar.jar b/samza-core/src/test/resources/classloader/classpath/placeholder-jar.jar
new file mode 100644
index 0000000..d9b2beb
--- /dev/null
+++ b/samza-core/src/test/resources/classloader/classpath/placeholder-jar.jar
@@ -0,0 +1 @@
+NOT AN ACTUAL JAR; JUST A PLACEHOLDER FOR A TEST
diff --git a/samza-core/src/test/resources/classloader/classpath/placeholder-txt.json b/samza-core/src/test/resources/classloader/classpath/placeholder-txt.json
new file mode 100644
index 0000000..c4336ea
--- /dev/null
+++ b/samza-core/src/test/resources/classloader/classpath/placeholder-txt.json
@@ -0,0 +1 @@
+{"key": "This is a placeholder file which should be ignored in testing, since it is not a jar/war."}
\ No newline at end of file
diff --git a/samza-core/src/test/resources/classloader/classpath/placeholder-war.war b/samza-core/src/test/resources/classloader/classpath/placeholder-war.war
new file mode 100644
index 0000000..9fa2312
--- /dev/null
+++ b/samza-core/src/test/resources/classloader/classpath/placeholder-war.war
@@ -0,0 +1 @@
+NOT AN ACTUAL WAR; JUST A PLACEHOLDER FOR A TEST
diff --git a/samza-core/src/test/resources/classloader/samza-framework-api-classes.txt b/samza-core/src/test/resources/classloader/samza-framework-api-classes.txt
new file mode 100644
index 0000000..8b081d7
--- /dev/null
+++ b/samza-core/src/test/resources/classloader/samza-framework-api-classes.txt
@@ -0,0 +1,6 @@
+org.apache.samza.JavaClass
+org.apache.samza.JavaClass$InnerJavaClass
+org.apache.samza.ScalaClass$
+org.apache.samza.ScalaClass$$anon$1
+my.package.with.wildcard.*
+my.package.with.question.mark?
\ No newline at end of file
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index 2b7d22e..8cdac30 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -39,10 +39,18 @@ fi
HADOOP_YARN_HOME="${HADOOP_YARN_HOME:-$HOME/.samza}"
HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$HADOOP_YARN_HOME/conf}"
GC_LOG_ROTATION_OPTS="-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024"
-DEFAULT_LOG4J_FILE=$base_dir/lib/log4j.xml
-DEFAULT_LOG4J2_FILE=$base_dir/lib/log4j2.xml
+LOG4J_FILE_NAME="log4j.xml"
+LOG4J2_FILE_NAME="log4j2.xml"
BASE_LIB_DIR="$base_dir/lib"
+DEFAULT_LOG4J_FILE=$BASE_LIB_DIR/$LOG4J_FILE_NAME
+DEFAULT_LOG4J2_FILE=$BASE_LIB_DIR/$LOG4J2_FILE_NAME
+# APPLICATION_LIB_DIR can be a directory which is different from $BASE_LIB_DIR which contains some additional
+# application-specific resources. If it is not set, then $BASE_LIB_DIR will be used as the value.
+APPLICATION_LIB_DIR="${APPLICATION_LIB_DIR:-$BASE_LIB_DIR}"
+export APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
+
+echo APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
echo BASE_LIB_DIR=$BASE_LIB_DIR
CLASSPATH=""
@@ -100,11 +108,28 @@ function check_and_enable_64_bit_mode {
# Make the MDC inheritable to child threads by setting the system property to true if config not explicitly specified
[[ $JAVA_OPTS != *-DisThreadContextMapInheritable* ]] && JAVA_OPTS="$JAVA_OPTS -DisThreadContextMapInheritable=true"
-# Check if log4j configuration is specified. If not - set to lib/log4j.xml
-if [[ -n $(find "$base_dir/lib" -regex ".*samza-log4j2.*.jar*") ]]; then
- [[ $JAVA_OPTS != *-Dlog4j.configurationFile* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:$DEFAULT_LOG4J2_FILE"
-elif [[ -n $(find "$base_dir/lib" -regex ".*samza-log4j.*.jar*") ]]; then
- [[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$DEFAULT_LOG4J_FILE"
+# Check if log4j configuration is specified; if not, look for a configuration file:
+# 1) Check if using log4j or log4j2
+# 2) Check if configuration file system property is already set
+# 3) If not, then look in $APPLICATION_LIB_DIR for configuration file (remember that $APPLICATION_LIB_DIR can be same or
+# different from $BASE_LIB_DIR).
+# 4) If still can't find it, fall back to default (from $BASE_LIB_DIR).
+if [[ -n $(find "$BASE_LIB_DIR" -regex ".*samza-log4j2.*.jar*") ]]; then
+ if [[ $JAVA_OPTS != *-Dlog4j.configurationFile* ]]; then
+ if [[ -n $(find "$APPLICATION_LIB_DIR" -maxdepth 1 -name $LOG4J2_FILE_NAME) ]]; then
+ export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:$APPLICATION_LIB_DIR/$LOG4J2_FILE_NAME"
+ else
+ export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:$DEFAULT_LOG4J2_FILE"
+ fi
+ fi
+elif [[ -n $(find "$BASE_LIB_DIR" -regex ".*samza-log4j.*.jar*") ]]; then
+ if [[ $JAVA_OPTS != *-Dlog4j.configuration* ]]; then
+ if [[ -n $(find "$APPLICATION_LIB_DIR" -maxdepth 1 -name $LOG4J_FILE_NAME) ]]; then
+ export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$APPLICATION_LIB_DIR/$LOG4J_FILE_NAME"
+ else
+ export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$DEFAULT_LOG4J_FILE"
+ fi
+ fi
fi
# Check if samza.log.dir is specified. If not - set to environment variable if it is set