You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/04/28 21:27:56 UTC
[samza] branch master updated: SAMZA-2647: Clean up unused split
deployment code (#1493)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new de067c4 SAMZA-2647: Clean up unused split deployment code (#1493)
de067c4 is described below
commit de067c48d8bbac894e0efa41cda37ebf26551e42
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Wed Apr 28 14:27:47 2021 -0700
SAMZA-2647: Clean up unused split deployment code (#1493)
Issues: Some code was added for SEP-24 some time ago (#1172, #1173), but we are not moving forward with SEP-24 because it does not cleanly handle certain use cases. Since we don't need this code, it should get removed.
Changes:
1. Removed unused flows related to job coordinator dependency isolation.
2. Removed unused classloader separation utils.
API changes and usage/upgrade instructions:
Removed some configs and environment variables related to split deployment, but the feature wasn't complete, so those shouldn't be used by Samza jobs anyways.
---
build.gradle | 1 -
gradle/dependency-versions.gradle | 1 -
.../classloader/DependencyIsolationUtils.java | 50 ---
.../classloader/IsolatingClassLoaderFactory.java | 352 ---------------------
.../ClusterBasedJobCoordinatorRunner.java | 13 +-
.../java/org/apache/samza/config/JobConfig.java | 6 -
.../apache/samza/config/ShellCommandConfig.java | 23 --
.../org/apache/samza/util/SplitDeploymentUtil.java | 88 ------
.../TestIsolatingClassLoaderFactory.java | 96 ------
.../org/apache/samza/config/TestJobConfig.java | 11 -
.../apache/samza/util/TestSplitDeploymentUtil.java | 71 -----
samza-shell/src/main/bash/run-class.sh | 34 +-
.../scala/org/apache/samza/job/yarn/YarnJob.scala | 28 +-
.../org/apache/samza/job/yarn/TestYarnJob.java | 45 ---
14 files changed, 15 insertions(+), 804 deletions(-)
diff --git a/build.gradle b/build.gradle
index 9c48b23..64cef19 100644
--- a/build.gradle
+++ b/build.gradle
@@ -195,7 +195,6 @@ project(":samza-core_$scalaSuffix") {
compile "org.scala-lang:scala-library:$scalaVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
compile "net.jodah:failsafe:$failsafeVersion"
- compile "com.linkedin.cytodynamics:cytodynamics-nucleus:$cytodynamicsVersion"
testCompile project(":samza-api").sourceSets.test.output
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index e289af0..738e2d0 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -25,7 +25,6 @@
commonsHttpClientVersion = "3.1"
commonsIoVersion = "2.8.0"
commonsLang3Version = "3.11"
- cytodynamicsVersion = "0.2.0"
elasticsearchVersion = "2.2.0"
gsonVersion = "2.8.6"
guavaVersion = "30.1-jre"
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java b/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
deleted file mode 100644
index a0b5d1e..0000000
--- a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-public class DependencyIsolationUtils {
- /**
- * Directory inside the home directory of the cluster-based job coordinator in which the framework API artifacts are
- * placed, for usage in dependency isolation for the cluster-based job coordinator.
- * TODO make this configurable or taken from an environment variable
- */
- public static final String FRAMEWORK_API_DIRECTORY = "__samzaFrameworkApi";
-
- /**
- * Directory inside the home directory of the cluster-based job coordinator in which the framework infrastructure
- * artifacts are placed, for usage in dependency isolation for the cluster-based job coordinator.
- * TODO make this configurable or taken from an environment variable
- */
- public static final String FRAMEWORK_INFRASTRUCTURE_DIRECTORY = "__samzaFrameworkInfrastructure";
-
- /**
- * Directory inside the home directory of the cluster-based job coordinator in which the application artifacts are
- * placed, for usage in dependency isolation for the cluster-based job coordinator.
- * TODO make this configurable or taken from an environment variable
- */
- public static final String APPLICATION_DIRECTORY = "__package";
-
- /**
- * Name of the file which contains the class names (or globs) which should be loaded from the framework API
- * classloader.
- */
- public static final String FRAMEWORK_API_CLASS_LIST_FILE_NAME = "samza-framework-api-classes.txt";
-
- public static final String RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME = "runtime-framework-resources-pathing.jar";
-}
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java b/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
deleted file mode 100644
index 344a034..0000000
--- a/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-import com.linkedin.cytodynamics.matcher.BootstrapClassPredicate;
-import com.linkedin.cytodynamics.matcher.GlobMatcher;
-import com.linkedin.cytodynamics.nucleus.DelegateRelationship;
-import com.linkedin.cytodynamics.nucleus.DelegateRelationshipBuilder;
-import com.linkedin.cytodynamics.nucleus.IsolationLevel;
-import com.linkedin.cytodynamics.nucleus.LoaderBuilder;
-import com.linkedin.cytodynamics.nucleus.OriginRestriction;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Use this to build a classloader for running Samza which isolates the Samza framework code/dependencies from the
- * application code/dependencies.
- */
-public class IsolatingClassLoaderFactory {
- private static final Logger LOG = LoggerFactory.getLogger(IsolatingClassLoaderFactory.class);
-
- private static final String LIB_DIRECTORY = "lib";
-
- /**
- * Build a classloader which will isolate Samza framework code from application code. Samza framework classes and
- * application-specific classes will be loaded using a different classloaders. This will enable dependencies of each
- * category of classes to also be loaded separately, so that runtime dependency conflicts do not happen.
- * Each call to this method will build a different instance of a classloader.
- *
- * Samza framework API classes need to be specified in a file called
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} which is in the lib directory which is in the
- * API package. The file needs to be generated when building the framework API package. This class will not generate
- * the file.
- *
- * Implementation notes:
- *
- * The cytodynamics isolating classloader is used for this. It provides more control than the built-in
- * {@link URLClassLoader}. Cytodynamics provides the ability to compose multiple classloaders together and have more
- * granular delegation strategies between the classloaders.
- *
- * In order to share objects between classes loaded by different classloaders, the classes for the shared objects must
- * be loaded by a common classloader. Those common classes will be loaded through a common API classloader. The
- * cytodynamics classloader can be set up to only use the common API classloader for an explicit set of classes. The
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file should include the framework API classes.
- * Also, bootstrap classes (e.g. java.lang.String) need to be loaded by a common classloader, since objects of those
- * types need to be shared across different framework and application. There are also some static bootstrap classes
- * which should be shared (e.g. java.lang.System). Bootstrap classes will be loaded through a common classloader by
- * default.
- *
- * These are the classloaders which are used to make up the final classloader.
- * <ul>
- * <li>bootstrap classloader: Built-in Java classes (e.g. java.lang.String)</li>
- * <li>API classloader: Common Samza framework API classes</li>
- * <li>infrastructure classloader: Core Samza framework classes and plugins that are included in the framework</li>
- * <li>
- * application classloader: Application code and plugins that are needed in the app but are not included in the
- * framework
- * </li>
- * </ul>
- *
- * This is the delegation structure for the classloaders:
- * <pre>
- * (bootstrap (API (application
- * classloader) <---- classloader) <------- classloader)
- * ^ ^
- * | /
- * | /
- * | /
- * | /
- * (infrastructure classloader)
- * </pre>
- * The cytodynamics classloader allows control over when the delegation should happen.
- * <ol>
- * <li>API classloader delegates to the bootstrap classloader if the bootstrap classloader has the class.</li>
- * <li>
- * Infrastructure classloader only delegates to the API classloader for the common classes specified by
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
- * </li>
- * <li>
- * Infrastructure classloader delegates to the application classloader when a class can't be found in the
- * infrastructure classloader.
- * </li>
- * <li>
- * Application classloader only delegates to the API classloader for the common classes specified by
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
- * </li>
- * </ol>
- */
- public ClassLoader buildClassLoader() {
- // start at the user.dir to find the resources for the classpaths
- File baseJobDirectory = new File(System.getProperty("user.dir"));
- File apiLibDirectory = libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.FRAMEWORK_API_DIRECTORY));
- LOG.info("Using API lib directory: {}", apiLibDirectory);
- File infrastructureLibDirectory =
- libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY));
- LOG.info("Using infrastructure lib directory: {}", infrastructureLibDirectory);
- File applicationLibDirectory =
- libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.APPLICATION_DIRECTORY));
- LOG.info("Using application lib directory: {}", applicationLibDirectory);
-
- ClassLoader apiClassLoader = buildApiClassLoader(apiLibDirectory);
- ClassLoader applicationClassLoader =
- buildApplicationClassLoader(applicationLibDirectory, apiLibDirectory, apiClassLoader);
-
- // the classloader to return is the one with the infrastructure classpath
- return buildInfrastructureClassLoader(infrastructureLibDirectory, baseJobDirectory, apiLibDirectory, apiClassLoader,
- applicationClassLoader);
- }
-
- /**
- * Build the {@link ClassLoader} which can load framework API classes.
- *
- * This sets up the link between the bootstrap classloader and the API classloader (see {@link #buildClassLoader()}.
- */
- private static ClassLoader buildApiClassLoader(File apiLibDirectory) {
- /*
- * This can just use the built-in classloading, which checks the parent classloader first and then checks its own
- * classpath. A null parent means bootstrap classloader, which contains core Java classes (e.g. java.lang.String).
- * This doesn't need to be isolated from the parent, because we only want to load all bootstrap classes from the
- * bootstrap classloader.
- */
- return new URLClassLoader(getClasspathAsURLs(apiLibDirectory), null);
- }
-
- /**
- * Build the {@link ClassLoader} which can load application classes.
- *
- * This sets up the link between the application classloader and the API classloader (see {@link #buildClassLoader()}.
- */
- private static ClassLoader buildApplicationClassLoader(File applicationLibDirectory, File apiLibDirectory,
- ClassLoader apiClassLoader) {
- return LoaderBuilder.anIsolatingLoader()
- // look in application lib directory for JARs
- .withClasspath(getClasspathAsURIs(applicationLibDirectory))
- // getClasspathAsURIs should only return JARs within applicationLibDirectory anyways, but doing it to be safe
- .withOriginRestriction(OriginRestriction.denyByDefault().allowingDirectory(applicationLibDirectory, false))
- // delegate to the api classloader for API classes
- .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
- .build();
- }
-
- /**
- * Build the {@link ClassLoader} which can load Samza framework core classes. If a file with the name
- * {@link DependencyIsolationUtils#RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME} is found in {@code baseJobDirectory},
- * then it will be included in the classpath.
- * This may also fall back to loading application classes.
- *
- * This sets up two links: One link between the infrastructure classloader and the API and another link between the
- * infrastructure classloader and the application classloader (see {@link #buildClassLoader()}.
- */
- private static ClassLoader buildInfrastructureClassLoader(File infrastructureLibDirectory,
- File baseJobDirectory,
- File apiLibDirectory,
- ClassLoader apiClassLoader,
- ClassLoader applicationClassLoader) {
- // start with JARs in infrastructure lib directory
- List<URI> classpathURIs = new ArrayList<>(getClasspathAsURIs(infrastructureLibDirectory));
- OriginRestriction originRestriction = OriginRestriction.denyByDefault()
- // getClasspathAsURIs should only return JARs within infrastructureLibDirectory anyways, but doing it to be safe
- .allowingDirectory(infrastructureLibDirectory, false);
- File runtimeFrameworkResourcesPathingJar =
- new File(baseJobDirectory, DependencyIsolationUtils.RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME);
- if (canAccess(runtimeFrameworkResourcesPathingJar)) {
- // if there is a runtime framework resources pathing JAR, then include that in the classpath as well
- classpathURIs.add(runtimeFrameworkResourcesPathingJar.toURI());
- originRestriction.allowingGlobPattern(fileURL(runtimeFrameworkResourcesPathingJar).toExternalForm());
- LOG.info("Added {} to infrastructure classpath", runtimeFrameworkResourcesPathingJar.getPath());
- } else {
- LOG.info("Unable to access {}, so not adding to infrastructure classpath",
- runtimeFrameworkResourcesPathingJar.getPath());
- }
- return LoaderBuilder.anIsolatingLoader()
- .withClasspath(Collections.unmodifiableList(classpathURIs))
- .withOriginRestriction(originRestriction)
- .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
- /*
- * Fall back to the application classloader for certain classes. For example, the application might implement
- * some pluggable classes (e.g. SystemFactory). Another example is message schemas that are supplied by the
- * application.
- */
- .addFallbackDelegate(DelegateRelationshipBuilder.builder()
- .withDelegateClassLoader(applicationClassLoader)
- /*
- * NONE means that a class will be loaded from here if it is not found in the classpath of the loader that uses
- * this relationship.
- */
- .withIsolationLevel(IsolationLevel.NONE)
- .build())
- .build();
- }
-
- /**
- * Build a {@link DelegateRelationship} which defines how to delegate to the API classloader.
- *
- * Delegation will only happen for classes specified in
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} and the Java bootstrap classes.
- */
- private static DelegateRelationship buildApiParentRelationship(File apiLibDirectory, ClassLoader apiClassLoader) {
- DelegateRelationshipBuilder apiParentRelationshipBuilder = DelegateRelationshipBuilder.builder()
- // needs to load API classes from the API classloader
- .withDelegateClassLoader(apiClassLoader)
- /*
- * FULL means to only load classes explicitly specified as "API" from the API classloader. We will use
- * delegate-preferred class predicates to specify which classes are "API" (see below).
- */
- .withIsolationLevel(IsolationLevel.FULL);
-
- // bootstrap classes need to be loaded from a common classloader
- apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new BootstrapClassPredicate());
- // the classes which are Samza framework API classes are added here
- getFrameworkApiClassGlobs(apiLibDirectory).forEach(
- apiClassName -> apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new GlobMatcher(apiClassName)));
- return apiParentRelationshipBuilder.build();
- }
-
- /**
- * Gets the globs for matching against classes to load from the framework API classloader. This will read the
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file in {@code directoryWithClassList} to get
- * the globs.
- *
- * @param directoryWithClassList Directory in which
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} lives
- * @return {@link List} of globs for matching against classes to load from the framework API classloader
- */
- @VisibleForTesting
- static List<String> getFrameworkApiClassGlobs(File directoryWithClassList) {
- File parentPreferredFile =
- new File(directoryWithClassList, DependencyIsolationUtils.FRAMEWORK_API_CLASS_LIST_FILE_NAME);
- validateCanAccess(parentPreferredFile);
- try {
- return Files.readAllLines(Paths.get(parentPreferredFile.toURI()), StandardCharsets.UTF_8)
- .stream()
- .filter(StringUtils::isNotBlank)
- .collect(Collectors.toList());
- } catch (IOException e) {
- throw new SamzaException("Error while reading samza-api class list", e);
- }
- }
-
- /**
- * Get the {@link URL}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
- * not recursive.
- */
- @VisibleForTesting
- static URL[] getClasspathAsURLs(File jarsLocation) {
- validateCanAccess(jarsLocation);
- File[] filesInJarsLocation = jarsLocation.listFiles();
- if (filesInJarsLocation == null) {
- throw new SamzaException(
- String.format("Could not find any files inside %s, probably because it is not a directory",
- jarsLocation.getPath()));
- }
- URL[] urls = Stream.of(filesInJarsLocation)
- .filter(file -> file.getName().endsWith(".jar") || file.getName().endsWith(".war"))
- .map(IsolatingClassLoaderFactory::fileURL)
- .toArray(URL[]::new);
- LOG.info("Found {} items to load into classpath from {}", urls.length, jarsLocation);
- Stream.of(urls).forEach(url -> LOG.debug("Found {} from {}", url, jarsLocation));
- return urls;
- }
-
- /**
- * Get the {@link URI}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
- * not recursive.
- */
- @VisibleForTesting
- static List<URI> getClasspathAsURIs(File jarsLocation) {
- return Stream.of(getClasspathAsURLs(jarsLocation))
- .map(IsolatingClassLoaderFactory::urlToURI)
- .collect(Collectors.toList());
- }
-
- private static boolean canAccess(File file) {
- return file.exists() && file.canRead();
- }
-
- /**
- * Makes sure that a file exists and can be read.
- */
- private static void validateCanAccess(File file) {
- if (!canAccess(file)) {
- throw new SamzaException("Unable to access file: " + file);
- }
- }
-
- /**
- * Get the {@link URL} for a {@link File}.
- * Converts checked exceptions into {@link SamzaException}s.
- */
- private static URL fileURL(File file) {
- URI uri = file.toURI();
- try {
- return uri.toURL();
- } catch (MalformedURLException e) {
- throw new SamzaException("Unable to get URL for file: " + file, e);
- }
- }
-
- /**
- * Get the {@link URI} for a {@link URL}.
- * Converts checked exceptions into {@link SamzaException}s.
- */
- private static URI urlToURI(URL url) {
- try {
- return url.toURI();
- } catch (URISyntaxException e) {
- throw new SamzaException("Unable to get URI for URL: " + url, e);
- }
- }
-
- /**
- * Get the {@link File} representing the {@link #LIB_DIRECTORY} inside the given {@code file}.
- */
- private static File libDirectory(File file) {
- return new File(file, LIB_DIRECTORY);
- }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
index a152032..523742c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
@@ -26,7 +26,6 @@ import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
-import org.apache.samza.classloader.IsolatingClassLoaderFactory;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
@@ -35,7 +34,6 @@ import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.util.CoordinatorStreamUtil;
-import org.apache.samza.util.SplitDeploymentUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,19 +50,12 @@ public class ClusterBasedJobCoordinatorRunner {
LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. Exiting job coordinator", exception);
System.exit(1);
});
- if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
- // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
- runClusterBasedJobCoordinator(args);
- } else {
- SplitDeploymentUtil.runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(),
- ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
- }
+ runClusterBasedJobCoordinator(args);
System.exit(0);
}
/**
- * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
- * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
+ * This is the actual execution for the {@link ClusterBasedJobCoordinator}.
*/
@VisibleForTesting
static void runClusterBasedJobCoordinator(String[] args) {
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 021c67d..2aa5c2a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -143,8 +143,6 @@ public class JobConfig extends MapConfig {
public static final String COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory";
public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = "org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
- public static final String JOB_SPLIT_DEPLOYMENT_ENABLED = "job.split.deployment.enabled";
-
private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
// Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability (AM-HA).
@@ -386,10 +384,6 @@ public class JobConfig extends MapConfig {
return getStandbyTaskReplicationFactor() > 1;
}
- public boolean isSplitDeploymentEnabled() {
- return getBoolean(JOB_SPLIT_DEPLOYMENT_ENABLED, false);
- }
-
/**
* The metadata file is written in a {@code exec-env-container-id}.metadata file in the log-dir of the container.
* Here the {@code exec-env-container-id} refers to the ID assigned by the cluster manager (e.g., YARN) to the container,
diff --git a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
index 73093a8..73bcf8e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
@@ -57,29 +57,6 @@ public class ShellCommandConfig extends MapConfig {
*/
public static final String ENV_EXECUTION_ENV_CONTAINER_ID = "EXECUTION_ENV_CONTAINER_ID";
- /**
- * Set to "true" if split deployment feature is enabled. Otherwise, will be considered false.
- *
- * The launch process for the cluster-based job coordinator and job container depends on the value of this, since it
- * needs to be known if the cluster-based job coordinator and job container should be launched in a split deployment
- * mode.
- * This needs to be an environment variable, because the value needs to be known before the full configs can be read
- * from the metadata store (full configs are only read after launch is complete).
- */
- public static final String ENV_SPLIT_DEPLOYMENT_ENABLED = "ENV_SPLIT_DEPLOYMENT_ENABLED";
-
- /**
- * When running the cluster-based job coordinator and job container in a split deployment mode, it uses JARs and
- * resources from a lib directory which is provided by the framework. In some cases, it is necessary to use some
- * resources specified by the application as well. This environment variable can be set to a directory which is
- * different from the framework lib directory in order to tell Samza where application resources live.
- * This is an environment variable because it is needed in order to launch the cluster-based job coordinator and job
- * container Java processes, which means access to full configs is not available yet.
- * For example, this is used to set a system property for the location of an application-specified log4j configuration
- * file when launching the cluster-based job coordinator and job container Java processes.
- */
- public static final String ENV_APPLICATION_LIB_DIR = "APPLICATION_LIB_DIR";
-
/*
* The base directory for storing logged data stores used in Samza. This has to be set on all machine running Samza
* containers. For example, when using YARN, it has to be set in all NMs and passed to the containers.
diff --git a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java b/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
deleted file mode 100644
index 200cd3c..0000000
--- a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.ShellCommandConfig;
-
-
-public final class SplitDeploymentUtil {
-
- /**
- * The split deployment feature uses system env {@code ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED} to represent
- * if the user chooses to enable it.
- * This function helps to detect if the split deployment feature is enabled.
- *
- * @return true if split deployment is enabled; vice versa
- */
- public static boolean isSplitDeploymentEnabled() {
- return Boolean.parseBoolean(System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
- }
-
- /**
- * Execute the runner class using a separate isolated classloader.
- * @param classLoader {@link ClassLoader} to use to load the runner class which will run
- * @param originalRunnerClass {@link Class} for which will be executed with the new class loader.
- * @param runMethodName run method name of runner class
- * @param runMethodArgs arguments to pass to run method
- */
- public static void runWithClassLoader(ClassLoader classLoader, Class<?> originalRunnerClass, String runMethodName,
- String[] runMethodArgs) {
- // need to use the isolated classloader to load run method and then execute using that new class
- Class<?> runnerClass;
- try {
- runnerClass = classLoader.loadClass(originalRunnerClass.getName());
- } catch (ClassNotFoundException e) {
- throw new SamzaException(String.format(
- "Isolation was enabled, but unable to find %s in isolated classloader", originalRunnerClass.getName()), e);
- }
-
- // save the current context classloader so it can be reset after finishing the call to run method
- ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
- // this is needed because certain libraries (e.g. log4j) use the context classloader
- Thread.currentThread().setContextClassLoader(classLoader);
-
- try {
- executeRunForRunnerClass(runnerClass, runMethodName, runMethodArgs);
- } finally {
- // reset the context class loader; it's good practice, and could be important when running a test suite
- Thread.currentThread().setContextClassLoader(previousContextClassLoader);
- }
- }
-
- private static void executeRunForRunnerClass(Class<?> runnerClass, String runMethodName, String[] runMethodArgs) {
- Method runMethod;
- try {
- runMethod = runnerClass.getDeclaredMethod(runMethodName, String[].class);
- } catch (NoSuchMethodException e) {
- throw new SamzaException(String.format("Isolation was enabled, but unable to find %s method", runMethodName), e);
- }
- // only sets accessible flag for this method instance
- runMethod.setAccessible(true);
-
- try {
- // wrapping args in object array so that args is passed as a single argument to the method
- runMethod.invoke(null, new Object[]{runMethodArgs});
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new SamzaException(String.format("Exception while executing %s method", runMethodName), e);
- }
- }
-}
diff --git a/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java b/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
deleted file mode 100644
index 7444fbf..0000000
--- a/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-import java.io.File;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Set;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.samza.SamzaException;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-public class TestIsolatingClassLoaderFactory {
- @Test
- public void testGetApiClasses() throws URISyntaxException {
- File apiClassListFile = Paths.get(getClass().getResource("/classloader").toURI()).toFile();
- List<String> apiClassNames = IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(apiClassListFile);
- List<String> expected = ImmutableList.of(
- "org.apache.samza.JavaClass",
- "org.apache.samza.JavaClass$InnerJavaClass",
- "org.apache.samza.ScalaClass$",
- "org.apache.samza.ScalaClass$$anon$1",
- "my.package.with.wildcard.*",
- "my.package.with.question.mark?");
- assertEquals(expected, apiClassNames);
- }
-
- @Test(expected = SamzaException.class)
- public void testGetApiClassesFileDoesNotExist() throws URISyntaxException {
- File nonExistentDirectory =
- new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
- IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(nonExistentDirectory);
- }
-
- @Test
- public void testGetClasspathAsURLs() throws URISyntaxException {
- File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
- URL[] classpath = IsolatingClassLoaderFactory.getClasspathAsURLs(classpathDirectory);
- assertEquals(2, classpath.length);
- Set<URL> classpathSet = ImmutableSet.copyOf(classpath);
- URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
- assertTrue(classpathSet.contains(jarUrl));
- URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
- assertTrue(classpathSet.contains(warUrl));
- }
-
- @Test(expected = SamzaException.class)
- public void testGetClasspathAsURLsDirectoryDoesNotExist() throws URISyntaxException {
- File nonExistentDirectory =
- new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
- IsolatingClassLoaderFactory.getClasspathAsURLs(nonExistentDirectory);
- }
-
- @Test
- public void testGetClasspathAsURIs() throws URISyntaxException {
- File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
- List<URI> classpath = IsolatingClassLoaderFactory.getClasspathAsURIs(classpathDirectory);
- assertEquals(2, classpath.size());
- Set<URI> classpathSet = ImmutableSet.copyOf(classpath);
- URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
- assertTrue(classpathSet.contains(jarUrl.toURI()));
- URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
- assertTrue(classpathSet.contains(warUrl.toURI()));
- }
-
- @Test(expected = SamzaException.class)
- public void testGetClasspathAsURIsDirectoryDoesNotExist() throws URISyntaxException {
- File nonExistentDirectory =
- new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
- IsolatingClassLoaderFactory.getClasspathAsURIs(nonExistentDirectory);
- }
-}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 00c2d5e..af4bd1d 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -546,17 +546,6 @@ public class TestJobConfig {
}
@Test
- public void testGetClusterBasedJobCoordinatorDependencyIsolationEnabled() {
- Config config = new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
- assertTrue(new JobConfig(config).isSplitDeploymentEnabled());
-
- config = new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false"));
- assertFalse(new JobConfig(config).isSplitDeploymentEnabled());
-
- assertFalse(new JobConfig(new MapConfig()).isSplitDeploymentEnabled());
- }
-
- @Test
public void testGetMetadataFile() {
String execEnvContainerId = "container-id";
String containerMetadataDirectory = "/tmp/samza/log/dir";
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
deleted file mode 100644
index 72772ba..0000000
--- a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import org.apache.samza.clustermanager.ClusterBasedJobCoordinatorRunner;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.*;
-import static org.mockito.AdditionalMatchers.*;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.*;
-
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ClusterBasedJobCoordinatorRunner.class})
-public class TestSplitDeploymentUtil {
-
- @Test
- public void testRunWithIsolatingClassLoader() throws Exception {
- // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
- PowerMockito.spy(ClusterBasedJobCoordinatorRunner.class);
- // save the context classloader to make sure that it gets set properly once the test is finished
- ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
- ClassLoader classLoader = mock(ClassLoader.class);
- String[] args = new String[]{"arg0", "arg1"};
- doReturn(ClusterBasedJobCoordinatorRunner.class).when(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
-
- // stub the private static method which is called by reflection
- PowerMockito.doAnswer(invocation -> {
- // make sure the only calls to this method has the expected arguments
- assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
- // checks that the context classloader is set correctly
- assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
- return null;
- }).when(ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", any());
-
- try {
- SplitDeploymentUtil.runWithClassLoader(classLoader,
- ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
- assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
- } finally {
- // reset it explicitly just in case runWithClassLoader throws an exception
- Thread.currentThread().setContextClassLoader(previousContextClassLoader);
- }
- // make sure that the classloader got used
- verify(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
- // make sure runClusterBasedJobCoordinator only got called once
- verifyPrivate(ClusterBasedJobCoordinatorRunner.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
- }
-}
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index 9b5ac30..2dc1ec9 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -53,13 +53,13 @@ export APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
echo APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
echo BASE_LIB_DIR=$BASE_LIB_DIR
-BASE_LIB_CLASSPATH=""
+CLASSPATH=""
# all the jars need to be appended on newlines to ensure line argument length of 72 bytes is not violated
for file in $BASE_LIB_DIR/*.[jw]ar;
do
- BASE_LIB_CLASSPATH=$BASE_LIB_CLASSPATH" $file \n"
+ CLASSPATH=$CLASSPATH" $file \n"
done
-echo generated from BASE_LIB_DIR BASE_LIB_CLASSPATH=$BASE_LIB_CLASSPATH
+echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH
# In some cases (AWS) $JAVA_HOME/bin doesn't contain jar.
if [ -z "$JAVA_HOME" ] || [ ! -e "$JAVA_HOME/bin/jar" ]; then
@@ -68,23 +68,10 @@ else
JAR="$JAVA_HOME/bin/jar"
fi
-# Create a pathing JAR for the JARs in the BASE_LIB_DIR
# Newlines and spaces are intended to ensure proper parsing of manifest in pathing jar
-printf "Class-Path: \n $BASE_LIB_CLASSPATH \n" > base-lib-manifest.txt
-# Creates a new archive and adds custom manifest information to base-lib-pathing.jar
-eval "$JAR -cvmf base-lib-manifest.txt base-lib-pathing.jar"
-
-# Create a pathing JAR for the runtime framework resources. It is useful to separate this from the base-lib-pathing.jar
-# because the split deployment framework may only need the resources from this runtime pathing JAR.
-if ! [[ $HADOOP_CONF_DIR =~ .*/$ ]]; then
- # manifest requires a directory to have a trailing slash
- HADOOP_CONF_DIR="$HADOOP_CONF_DIR/"
-fi
-# HADOOP_CONF_DIR should be supplied to classpath explicitly for Yarn to parse configs
-RUNTIME_FRAMEWORK_RESOURCES_CLASSPATH="$HADOOP_CONF_DIR \n"
-# TODO add JARs from ADDITIONAL_CLASSPATH_DIR to runtime-framework-resources-pathing.jar as well
-printf "Class-Path: \n $RUNTIME_FRAMEWORK_RESOURCES_CLASSPATH \n" > runtime-framework-resources-manifest.txt
-eval "$JAR -cvmf runtime-framework-resources-manifest.txt runtime-framework-resources-pathing.jar"
+printf "Class-Path: \n $CLASSPATH \n" > manifest.txt
+# Creates a new archive and adds custom manifest information to pathing.jar
+eval "$JAR -cvmf manifest.txt pathing.jar"
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
@@ -163,11 +150,12 @@ fi
# Check if 64 bit is set. If not - try and set it if it's supported
[[ $JAVA_OPTS != *-d64* ]] && check_and_enable_64_bit_mode
-echo $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar "$@"
+# HADOOP_CONF_DIR should be supplied to classpath explicitly for Yarn to parse configs
+echo $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:pathing.jar "$@"
## If localized resource lib directory is defined, then include it in the classpath.
if [[ -z "${ADDITIONAL_CLASSPATH_DIR}" ]]; then
- exec $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar "$@"
+ exec $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:pathing.jar "$@"
else
- exec $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar:$ADDITIONAL_CLASSPATH_DIR "$@"
-fi
\ No newline at end of file
+ exec $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:pathing.jar:$ADDITIONAL_CLASSPATH_DIR "$@"
+fi
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 7e4565b..237667d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -19,14 +19,11 @@
package org.apache.samza.job.yarn
-import java.lang.Boolean
-
import com.google.common.annotations.VisibleForTesting
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.samza.SamzaException
-import org.apache.samza.classloader.DependencyIsolationUtils
import org.apache.samza.config.{Config, JobConfig, ShellCommandConfig, YarnConfig}
import org.apache.samza.job.ApplicationStatus.{SuccessfulFinish, UnsuccessfulFinish}
import org.apache.samza.job.{ApplicationStatus, StreamJob}
@@ -46,7 +43,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob wit
def submit: YarnJob = {
try {
val jobConfig = new JobConfig(config)
- val cmdExec = YarnJob.buildJobCoordinatorCmd(config, jobConfig)
+ val cmdExec = "./__package/bin/run-jc.sh"
val environment = YarnJob.buildEnvironment(config, this.yarnConfig, jobConfig)
appId = client.submitApplication(
@@ -184,13 +181,6 @@ object YarnJob extends Logging {
Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig))
}
envMapBuilder += ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(yarnConfig.getAmOpts)
- val splitDeploymentEnabled = jobConfig.isSplitDeploymentEnabled
- envMapBuilder += ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED -> Util.envVarEscape(Boolean.toString(splitDeploymentEnabled))
- if (splitDeploymentEnabled) {
- //split deployment is enabled, so need to specify where the application lib directory is for app resources
- envMapBuilder += ShellCommandConfig.ENV_APPLICATION_LIB_DIR ->
- Util.envVarEscape(String.format("./%s/lib", DependencyIsolationUtils.APPLICATION_DIRECTORY))
- }
Option.apply(yarnConfig.getAMJavaHome).foreach {
amJavaHome => envMapBuilder += ShellCommandConfig.ENV_JAVA_HOME -> amJavaHome
}
@@ -198,18 +188,4 @@ object YarnJob extends Logging {
Util.envVarEscape(config.get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, ""))
envMapBuilder.result()
}
-
- /**
- * Build the command for the job coordinator execution.
- * Passing multiple separate config objects so that they can be reused in other places.
- */
- @VisibleForTesting
- private[yarn] def buildJobCoordinatorCmd(config: Config, jobConfig: JobConfig): String = {
- var cmdExec = "./__package/bin/run-jc.sh" // default location
- if (jobConfig.isSplitDeploymentEnabled) {
- cmdExec = "./%s/bin/run-jc.sh" format DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY
- logger.info("Using isolated cluster-based job coordinator path: %s" format cmdExec)
- }
- cmdExec
- }
-}
\ No newline at end of file
+}
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
index f068800..4e2c4a7 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
@@ -21,7 +21,6 @@ package org.apache.samza.job.yarn;
import java.io.IOException;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
-import org.apache.samza.classloader.DependencyIsolationUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
@@ -38,19 +37,6 @@ import static org.junit.Assert.assertEquals;
public class TestYarnJob {
@Test
- public void testBuildJobCoordinatorCmd() {
- // cluster-based job coordinator dependency isolation is not enabled; use script from __package directory
- Config config = new MapConfig();
- assertEquals("./__package/bin/run-jc.sh", YarnJob$.MODULE$.buildJobCoordinatorCmd(config, new JobConfig(config)));
-
- // split deployment is enabled; use script from framework infrastructure directory
- Config splitDeploymentEnabled =
- new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
- assertEquals(String.format("./%s/bin/run-jc.sh", DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY),
- YarnJob$.MODULE$.buildJobCoordinatorCmd(splitDeploymentEnabled, new JobConfig(splitDeploymentEnabled)));
- }
-
- @Test
public void testBuildEnvironment() throws IOException {
String amJvmOptions = "-Xmx1g -Dconfig.key='config value'";
Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
@@ -58,35 +44,12 @@ public class TestYarnJob {
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
.put(YarnConfig.AM_JVM_OPTIONS, amJvmOptions) // needs escaping
- .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
.build());
String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
ShellCommandConfig.ENV_JAVA_OPTS, Util.envVarEscape(amJvmOptions),
- ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false",
- ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
- assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
- YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
- }
-
- @Test
- public void testBuildEnvironmentJobCoordinatorDependencyIsolationEnabled() throws IOException {
- Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
- .put(JobConfig.JOB_NAME, "jobName")
- .put(JobConfig.JOB_ID, "jobId")
- .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
- .put(YarnConfig.AM_JVM_OPTIONS, "")
- .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
- .build());
- String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
- .writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
- Map<String, String> expected = ImmutableMap.of(
- ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
- ShellCommandConfig.ENV_JAVA_OPTS, "",
- ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
- ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
@@ -99,7 +62,6 @@ public class TestYarnJob {
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
.put(YarnConfig.AM_JVM_OPTIONS, "")
- .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
.put(YarnConfig.AM_JAVA_HOME, "/some/path/to/java/home")
.build());
String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
@@ -107,7 +69,6 @@ public class TestYarnJob {
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
ShellCommandConfig.ENV_JAVA_OPTS, "",
- ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false",
ShellCommandConfig.ENV_JAVA_HOME, "/some/path/to/java/home",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
@@ -121,15 +82,12 @@ public class TestYarnJob {
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory")
.put(YarnConfig.AM_JVM_OPTIONS, "")
- .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
.build());
String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(config));
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
ShellCommandConfig.ENV_JAVA_OPTS, "",
- ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
- ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
@@ -142,7 +100,6 @@ public class TestYarnJob {
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory")
.put(YarnConfig.AM_JVM_OPTIONS, "")
- .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
.put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "./sqlapp/lib/*")
.build());
String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
@@ -150,8 +107,6 @@ public class TestYarnJob {
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
ShellCommandConfig.ENV_JAVA_OPTS, "",
- ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
- ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "./sqlapp/lib/*");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());