You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2022/04/29 01:25:45 UTC
[samza] branch master updated: SAMZA-2737: Change RemoteAppRunner to plan for local deployments (#1602)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c4d42a4d2 SAMZA-2737: Change RemoteAppRunner to plan for local deployments (#1602)
c4d42a4d2 is described below
commit c4d42a4d29a71f08e75907c77c8fdc433e0a620a
Author: Daniel Chen <xr...@uwaterloo.ca>
AuthorDate: Thu Apr 28 18:25:40 2022 -0700
SAMZA-2737: Change RemoteAppRunner to plan for local deployments (#1602)
---
.../samza/runtime/RemoteApplicationRunner.java | 27 ++-
.../apache/samza/job/local/ProcessJobFactory.scala | 18 --
.../apache/samza/job/local/ThreadJobFactory.scala | 17 --
.../samza/runtime/TestRemoteApplicationRunner.java | 54 +++++
samza-shell/src/main/bash/run-framework-class.sh | 266 +++++++++++++++++++++
5 files changed, 344 insertions(+), 38 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index bb4ea18df..eea2ea4b4 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -31,6 +31,8 @@ import org.apache.samza.execution.JobPlanner;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.JobRunner;
+import org.apache.samza.job.local.ProcessJobFactory;
+import org.apache.samza.job.local.ThreadJobFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +63,10 @@ public class RemoteApplicationRunner implements ApplicationRunner {
@Override
public void run(ExternalContext externalContext) {
- if (new JobConfig(config).getConfigLoaderFactory().isPresent()) {
+ // By default with RemoteApplication runner we are going to defer the planning to JobCoordinatorRunner
+ // with the exception of local deployment
+ JobConfig userJobConfig = new JobConfig(config);
+ if (userJobConfig.getConfigLoaderFactory().isPresent() && !isLocalDeployment(userJobConfig)) {
JobRunner runner = new JobRunner(JobPlanner.generateSingleJobConfig(config));
runner.submit();
return;
@@ -69,7 +74,7 @@ public class RemoteApplicationRunner implements ApplicationRunner {
// TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
try {
- JobPlanner planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(app, config));
+ JobPlanner planner = getRemoteJobPlanner();
List<JobConfig> jobConfigs = planner.prepareJobs();
if (jobConfigs.isEmpty()) {
throw new SamzaException("No jobs to run.");
@@ -117,6 +122,7 @@ public class RemoteApplicationRunner implements ApplicationRunner {
waitForFinish(Duration.ofMillis(0));
}
+
@Override
public boolean waitForFinish(Duration timeout) {
JobConfig jobConfig = new JobConfig(JobPlanner.generateSingleJobConfig(config));
@@ -154,9 +160,24 @@ public class RemoteApplicationRunner implements ApplicationRunner {
}
/* package private */ ApplicationStatus getApplicationStatus(JobConfig jobConfig) {
+ // Bypass recreating the job runner for local deployments
+ // TODO: SAMZA-2738: Return real status for local jobs after avoiding recreating the Job in runner.status()
+ if (isLocalDeployment(jobConfig)) {
+ return Running;
+ }
JobRunner runner = new JobRunner(jobConfig);
ApplicationStatus status = runner.status();
LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
return status;
}
-}
+
+ /* package private */ RemoteJobPlanner getRemoteJobPlanner() {
+ return new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(app, config));
+ }
+
+ private boolean isLocalDeployment(JobConfig jobConfig) {
+ String jobFactoryClass = jobConfig.getStreamJobFactoryClass().orElse(null);
+ return ProcessJobFactory.class.getName().equals(jobFactoryClass) ||
+ ThreadJobFactory.class.getName().equals(jobFactoryClass);
+ }
+}
\ No newline at end of file
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 64e24086f..dbc650b55 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -46,24 +46,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
def getJob(submissionConfig: Config): StreamJob = {
var config = submissionConfig
- if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
- val originalConfig = ConfigUtil.loadConfig(submissionConfig)
-
- // Execute planning
- val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
- val jobConfigs = planner.prepareJobs
-
- if (jobConfigs.size != 1) {
- throw new SamzaException("Only single process job is supported.")
- }
-
- // This is the full job config
- config = jobConfigs.get(0)
- // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
- CoordinatorStreamUtil.writeConfigToCoordinatorStream(config)
- DiagnosticsUtil.createDiagnosticsStream(config)
- }
-
val containerCount = new JobConfig(config).getContainerCount
if (containerCount > 1) {
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index cf560709e..af0f5be14 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -50,23 +50,6 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
def getJob(submissionConfig: Config): StreamJob = {
info("Creating a ThreadJob, which is only meant for debugging.")
var config = submissionConfig
- if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
- val originalConfig = ConfigUtil.loadConfig(submissionConfig)
-
- // Execute planning
- val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
- val jobConfigs = planner.prepareJobs
-
- if (jobConfigs.size != 1) {
- throw new SamzaException("Only single stage job is supported.")
- }
-
- // This is the full job config
- config = jobConfigs.get(0)
- // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
- CoordinatorStreamUtil.writeConfigToCoordinatorStream(config)
- DiagnosticsUtil.createDiagnosticsStream(config)
- }
val metricsRegistry = new MetricsRegistryMap()
val coordinatorStreamStore: CoordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry)
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
index f38269482..105bfcb4b 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
@@ -20,18 +20,26 @@
package org.apache.samza.runtime;
import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.StreamJob;
import org.apache.samza.job.StreamJobFactory;
+import org.apache.samza.job.local.ProcessJobFactory;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -104,6 +112,52 @@ public class TestRemoteApplicationRunner {
Assert.assertEquals(ApplicationStatus.Running, runner.getApplicationStatus(new JobConfig(new MapConfig(m))));
}
+ @Test
+ public void testLocalRunWithConfigLoaderFactoryPresent() {
+ Map<String, String> config = new HashMap<>();
+ config.put(ApplicationConfig.APP_NAME, "test-app");
+ config.put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory");
+ config.put(JobConfig.STREAM_JOB_FACTORY_CLASS, ProcessJobFactory.class.getName());
+
+ try {
+ runner.run(null);
+ Assert.fail("Should have went to the planning phase");
+ } catch (SamzaException e) {
+ Assert.assertFalse(e.getMessage().contains("No jobs to run."));
+ }
+ }
+
+ @Ignore //TODO: SAMZA-2738: Return real status for local jobs after avoiding recreating the Job in runner.status()
+ @Test
+ public void testLocalGetStatus() {
+ Map<String, String> m = new HashMap<>();
+ m.put(JobConfig.JOB_NAME, "jobName");
+ m.put(JobConfig.STREAM_JOB_FACTORY_CLASS, ProcessJobFactory.class.getName());
+
+ m.put(JobConfig.JOB_ID, "newJob");
+
+ StreamApplication userApp = appDesc -> { };
+ runner = spy(new RemoteApplicationRunner(userApp, new MapConfig(m)));
+
+ Assert.assertEquals(ApplicationStatus.New, runner.getApplicationStatus(new JobConfig(new MapConfig(m))));
+
+ m.put(JobConfig.JOB_ID, "runningJob");
+ runner = spy(new RemoteApplicationRunner(userApp, new MapConfig(m)));
+ Assert.assertEquals(ApplicationStatus.Running, runner.getApplicationStatus(new JobConfig(new MapConfig(m))));
+ }
+
+ static public class MockRemoteJobPlanner extends RemoteJobPlanner {
+
+ public MockRemoteJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
+ super(descriptor);
+ }
+
+ @Override
+ public List<JobConfig> prepareJobs() {
+ return Collections.emptyList();
+ }
+ }
+
static public class MockStreamJobFactory implements StreamJobFactory {
public MockStreamJobFactory() {
diff --git a/samza-shell/src/main/bash/run-framework-class.sh b/samza-shell/src/main/bash/run-framework-class.sh
new file mode 100644
index 000000000..342047c58
--- /dev/null
+++ b/samza-shell/src/main/bash/run-framework-class.sh
@@ -0,0 +1,266 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ $# -lt 1 ];
+then
+ echo "USAGE: $0 classname [opts]"
+ exit 1
+fi
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+echo home_dir=$home_dir
+echo "framework base (location of this script). base_dir=$base_dir"
+
+if [ ! -d "$base_dir/lib" ]; then
+ echo "Unable to find $base_dir/lib, which is required to run."
+ exit 1
+fi
+
+HADOOP_YARN_HOME="${HADOOP_YARN_HOME:-$HOME/.samza}"
+HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$HADOOP_YARN_HOME/conf}"
+GC_LOG_ROTATION_OPTS="-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024"
+LOG4J_FILE_NAME="log4j.xml"
+LOG4J2_FILE_NAME="log4j2.xml"
+BASE_LIB_DIR="$base_dir/lib"
+DEFAULT_LOG4J_FILE=$BASE_LIB_DIR/$LOG4J_FILE_NAME
+DEFAULT_LOG4J2_FILE=$BASE_LIB_DIR/$LOG4J2_FILE_NAME
+
+# For Managed Beam Workflow type jobs, user jar and log4j2.xml will be placed under __userPackage/lib.
+# If __userPackage/lib exists, set APPLICATION_LIB_DIR to __userPackage/lib
+# TODO - use parameter like job type (WORKFLOW_DSL) to decide whether there is a separate user lib path
+if [ -d "$home_dir/__userPackage/lib" ]; then
+ APPLICATION_LIB_DIR="$home_dir/__userPackage/lib"
+ echo APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
+fi
+
+# APPLICATION_LIB_DIR can be a directory which is different from $BASE_LIB_DIR which contains some additional
+# application-specific resources. If it is not set, then $BASE_LIB_DIR will be used as the value.
+APPLICATION_LIB_DIR="${APPLICATION_LIB_DIR:-$BASE_LIB_DIR}"
+export APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
+
+echo APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
+echo BASE_LIB_DIR=$BASE_LIB_DIR
+
+CLASSPATH=""
+
+# This is LinkedIn Hadoop cluster specific dependency! The jar file is needed
+# for the Samza job to run on LinkedIn's Hadoop YARN cluster.
+# There is no clean way to include this dependency anywhere else, so we just
+# manually include it here.
+# Long term fix: make Hadoop YARN cluster officially support Samza job and prepare
+# runtime dependency for us.
+#
+if [ -e /export/apps/hadoop/site/lib/grid-topology-1.0.jar ]; then
+ CLASSPATH=$CLASSPATH" /export/apps/hadoop/site/lib/grid-topology-1.0.jar \n"
+fi
+
+# all the jars need to be appended on newlines to ensure line argument length of 72 bytes is not violated
+for file in $BASE_LIB_DIR/*.[jw]ar;
+do
+ CLASSPATH=$CLASSPATH" $file \n"
+done
+echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH
+
+# when APPLICATION_LIB_DIR is different from BASE_LIB_DIR, meaning it is a Managed Beam Workflow job, append
+# user jars in __userPackage/lib to the classpath
+# TODO - In the initial version of Managed Beam Workflow job, it's ensured that no extra jars are supplied from the
+# user job besides the user jar that only contains the pipeline proto file and non-jar resources. When simple UDF
+# support is to be added, we need to ensure no jar or class collision happens while combining user jars with framework jars
+# on classpath
+USER_CLASSPATH=""
+if [ "$APPLICATION_LIB_DIR" != "$BASE_LIB_DIR" ]; then
+ for file in $APPLICATION_LIB_DIR/*.[jw]ar;
+ do
+ USER_CLASSPATH=$USER_CLASSPATH" $file \n"
+ done
+ echo generated from $APPLICATION_LIB_DIR USER_CLASSPATH=$USER_CLASSPATH
+fi
+
+# In some cases (AWS) $JAVA_HOME/bin doesn't contain jar.
+if [ -z "$JAVA_HOME" ] || [ ! -e "$JAVA_HOME/bin/jar" ]; then
+ JAR="jar"
+else
+ JAR="$JAVA_HOME/bin/jar"
+fi
+
+# Create a separate directory for writing files related to classpath management. It is easier to manage
+# permissions for the classpath-related files when they are in their own directory. An example of where
+# this is helpful is when using container images which might have predefined permissions for certain
+# directories.
+CLASSPATH_WORKSPACE_DIR=$base_dir/classpath_workspace
+mkdir -p $CLASSPATH_WORKSPACE_DIR
+# file containing the classpath string; used to avoid passing long classpaths directly to the jar command
+PATHING_MANIFEST_FILE=$CLASSPATH_WORKSPACE_DIR/manifest.txt
+# jar file to include on the classpath for running the main class
+PATHING_JAR_FILE=$CLASSPATH_WORKSPACE_DIR/pathing.jar
+
+# Newlines and spaces are intended to ensure proper parsing of manifest in pathing jar
+printf "Class-Path: \n $CLASSPATH \n" > $PATHING_MANIFEST_FILE
+# Creates a new archive and adds custom manifest information to pathing.jar
+eval "$JAR -cvmf $PATHING_MANIFEST_FILE $PATHING_JAR_FILE"
+
+if [ -z "$JAVA_HOME" ]; then
+ JAVA="java"
+else
+ JAVA="$JAVA_HOME/bin/java"
+fi
+
+if [ -z "$SAMZA_LOG_DIR" ]; then
+ SAMZA_LOG_DIR="$base_dir"
+fi
+
+# add usercache directory
+mkdir -p $base_dir/tmp
+JAVA_TEMP_DIR=$base_dir/tmp
+
+# Check whether the JVM supports GC Log rotation, and enable it if so.
+function check_and_enable_gc_log_rotation {
+ `$JAVA -Xloggc:/dev/null $GC_LOG_ROTATION_OPTS -version 2> /dev/null`
+ if [ $? -eq 0 ] ; then
+ JAVA_OPTS="$JAVA_OPTS $GC_LOG_ROTATION_OPTS"
+ fi
+}
+
+# Try and use 64-bit mode if available in JVM_OPTS
+function check_and_enable_64_bit_mode {
+ `$JAVA -d64 -version`
+ if [ $? -eq 0 ] ; then
+ JAVA_OPTS="$JAVA_OPTS -d64"
+ fi
+}
+
+### Inherit JVM_OPTS from task.opts configuration, and initialize defaults ###
+
+# Make the MDC inheritable to child threads by setting the system property to true if config not explicitly specified
+[[ $JAVA_OPTS != *-DisThreadContextMapInheritable* ]] && JAVA_OPTS="$JAVA_OPTS -DisThreadContextMapInheritable=true"
+
+# Check if log4j configuration is specified; if not, look for a configuration file:
+# 1) Check if using log4j or log4j2
+# 2) Check if configuration file system property is already set
+# 3) If not, then look in $APPLICATION_LIB_DIR for configuration file (remember that $APPLICATION_LIB_DIR can be same or
+# different from $BASE_LIB_DIR).
+# 4) If still can't find it, fall back to default (from $BASE_LIB_DIR).
+if [[ -n $(find "$BASE_LIB_DIR" -regex ".*samza-log4j2.*.jar*") ]]; then
+ if [[ $JAVA_OPTS != *-Dlog4j.configurationFile* ]]; then
+ if [[ -n $(find "$APPLICATION_LIB_DIR" -maxdepth 1 -name $LOG4J2_FILE_NAME) ]]; then
+ export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:$APPLICATION_LIB_DIR/$LOG4J2_FILE_NAME"
+ # LI-ONLY CONFIG: Used to set log4j2 configurations for util-log (xeril) to be printed to the same .log file
+ [[ $JAVA_OPTS != *-Dlog4j2.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j2.configuration=file:$APPLICATION_LIB_DIR/$LOG4J2_FILE_NAME"
+ else
+ export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:$DEFAULT_LOG4J2_FILE"
+ # LI-ONLY CONFIG: Used to set log4j2 configurations for util-log (xeril) to be printed to the same .log file
+ [[ $JAVA_OPTS != *-Dlog4j2.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j2.configuration=file:$DEFAULT_LOG4J2_FILE"
+ fi
+ fi
+elif [[ -n $(find "$BASE_LIB_DIR" -regex ".*samza-log4j.*.jar*") ]]; then
+ if [[ $JAVA_OPTS != *-Dlog4j.configuration* ]]; then
+ if [[ -n $(find "$APPLICATION_LIB_DIR" -maxdepth 1 -name $LOG4J_FILE_NAME) ]]; then
+ export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$APPLICATION_LIB_DIR/$LOG4J_FILE_NAME"
+ else
+ export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$DEFAULT_LOG4J_FILE"
+ fi
+ fi
+fi
+
+# Check if samza.log.dir is specified. If not - set to environment variable if it is set
+[[ $JAVA_OPTS != *-Dsamza.log.dir* && ! -z "$SAMZA_LOG_DIR" ]] && JAVA_OPTS="$JAVA_OPTS -Dsamza.log.dir=$SAMZA_LOG_DIR"
+
+# Check if java.io.tmpdir is specified. If not - set to tmp in the base_dir
+[[ $JAVA_OPTS != *-Djava.io.tmpdir* ]] && JAVA_OPTS="$JAVA_OPTS -Djava.io.tmpdir=$JAVA_TEMP_DIR"
+
+# Check if a max-heap size is specified. If not - set a 768M heap
+[[ $JAVA_OPTS != *-Xmx* ]] && JAVA_OPTS="$JAVA_OPTS -Xmx768M"
+
+# Check if the GC related flags are specified. If not - add the respective flags to JVM_OPTS.
+[[ $JAVA_OPTS != *PrintGCDateStamps* && $JAVA_OPTS != *-Xloggc* ]] && JAVA_OPTS="$JAVA_OPTS -XX:+PrintGCDateStamps -Xloggc:$SAMZA_LOG_DIR/gc.log"
+
+# Check if GC log rotation is already enabled. If not - add the respective flags to JVM_OPTS
+[[ $JAVA_OPTS != *UseGCLogFileRotation* ]] && check_and_enable_gc_log_rotation
+
+# Check if 64 bit is set. If not - try and set it if it's supported
+[[ $JAVA_OPTS != *-d64* ]] && check_and_enable_64_bit_mode
+
+# Linkedin-specific: Add JVM option to guarantee exit on OOM
+JAVA_OPTS="${JAVA_OPTS} -XX:+ExitOnOutOfMemoryError"
+
+# Linkedin-specific: Add pre-installed boot jars to JVM boot class path, e.g. support Jetty Http2 communication by ALPN
+function setup_boot_jars_in_jvm_boot_classpath() {
+ # Get and format java version to follow SI's convention, e.g. 1.8.0_172 to 1_8_0_172
+ RAW_JAVA_VERSION=$($JAVA -version 2>&1 | awk -F '"' '/version/ {print $2}')
+ FORMATTED_JAVA_VERSION=${RAW_JAVA_VERSION//./_}
+
+ # The boot jars are pre-installed in different paths for Mac OS and Linux OS
+ if [[ "$OSTYPE" == "darwin"* ]]; then
+ BOOT_JAR_HOME_DIR="/Library/Java/Boot/${FORMATTED_JAVA_VERSION}/"
+ else
+ BOOT_JAR_HOME_DIR="/export/apps/jdkboot/${FORMATTED_JAVA_VERSION}/"
+ fi
+
+ # Add all the files under boot jar home dir in path separated with colons
+ BOOT_JAR_PATHS=""
+ if [ -d "${BOOT_JAR_HOME_DIR}" ]; then
+ for entry in "$BOOT_JAR_HOME_DIR"*
+ do
+ BOOT_JAR_PATHS="${BOOT_JAR_PATHS}${entry}:"
+ done
+ else
+ echo "No boot jar is pre-installed under dir: ${BOOT_JAR_HOME_DIR}"
+ fi
+ if [ -z "$BOOT_JAR_PATHS" ]; then
+ return
+ fi
+ # Remove last colon
+ BOOT_JAR_PATHS="$(echo $BOOT_JAR_PATHS | sed 's/.$//')"
+
+ JVM_BOOT_CLASSPATH_CONFIG_KEY="-Xbootclasspath"
+ # If "-Xbootclasspath" is already setup in JAVA_OPTS, rebuild the value with found boot jar paths
+ if [[ "$JAVA_OPTS" =~ "$JVM_BOOT_CLASSPATH_CONFIG_KEY" ]]; then
+ NEW_JAVA_OPTS=""
+ for option in $JAVA_OPTS
+ do
+ if [[ "$option" == "${JVM_BOOT_CLASSPATH_CONFIG_KEY}"* ]]; then
+ JVM_BOOT_CLASSPATH="${option}:${BOOT_JAR_PATHS}"
+ else
+ NEW_JAVA_OPTS="${NEW_JAVA_OPTS} $option"
+ fi
+ done
+ JAVA_OPTS="${NEW_JAVA_OPTS}"
+ else
+ JVM_BOOT_CLASSPATH="${JVM_BOOT_CLASSPATH_CONFIG_KEY}/p:${BOOT_JAR_PATHS}"
+ fi
+
+ JAVA_OPTS="${JAVA_OPTS} ${JVM_BOOT_CLASSPATH}"
+}
+setup_boot_jars_in_jvm_boot_classpath
+
+# HADOOP_CONF_DIR should be supplied to classpath explicitly for Yarn to parse configs
+echo $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:$PATHING_JAR_FILE "$@"
+
+## If localized resource lib directory is defined, then include it in the classpath.
+if [[ -z "${ADDITIONAL_CLASSPATH_DIR}" ]]; then
+ # LI-specific: Adding option to invoke script on OOM here because adding it in JAVA_OPTS causes encoding issues https://stackoverflow.com/questions/12532051/xxonoutofmemoryerror-cmd-arg-gives-error-could-not-find-or-load-main-c
+ exec $JAVA $JAVA_OPTS -XX:OnOutOfMemoryError="$BASE_LIB_DIR/../bin/handle-oom.sh $SAMZA_LOG_DIR" -cp $HADOOP_CONF_DIR:$PATHING_JAR_FILE "$@"
+else
+ # LI-specific: Adding option to invoke script on OOM here because adding it in JAVA_OPTS causes encoding issues https://stackoverflow.com/questions/12532051/xxonoutofmemoryerror-cmd-arg-gives-error-could-not-find-or-load-main-c
+ exec $JAVA $JAVA_OPTS -XX:OnOutOfMemoryError="$BASE_LIB_DIR/../bin/handle-oom.sh $SAMZA_LOG_DIR" -cp $HADOOP_CONF_DIR:$PATHING_JAR_FILE:$ADDITIONAL_CLASSPATH_DIR "$@"
+fi