You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:23 UTC

[05/50] incubator-gobblin git commit: [GOBBLIN-388] Allow classpath to be configured for JVM based task execution in gobblin cluster

[GOBBLIN-388] Allow classpath to be configured for JVM based task execution in gobblin cluster

Closes #2265 from yukuai518/classpath


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6a31ef84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6a31ef84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6a31ef84

Branch: refs/heads/0.12.0
Commit: 6a31ef845bd9617cb5cb8fa8ef53f184c3d6dd88
Parents: 11abf9f
Author: Kuai Yu <ku...@linkedin.com>
Authored: Wed Jan 24 21:27:03 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jan 24 21:27:03 2018 -0800

----------------------------------------------------------------------
 .../cluster/GobblinClusterConfigurationKeys.java       |  3 +++
 .../org/apache/gobblin/cluster/GobblinTaskRunner.java  |  2 +-
 .../org/apache/gobblin/cluster/HelixTaskFactory.java   |  5 +++--
 .../org/apache/gobblin/cluster/SingleTaskLauncher.java | 13 +++++++++++--
 .../apache/gobblin/cluster/SingleTaskLauncherTest.java |  4 +++-
 5 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6a31ef84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 4e78078..de501f1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -30,8 +30,11 @@ public class GobblinClusterConfigurationKeys {
 
   public static final String GOBBLIN_CLUSTER_PREFIX = "gobblin.cluster.";
 
+  // Task separation properties
   public static final String ENABLE_TASK_IN_SEPARATE_PROCESS =
       GOBBLIN_CLUSTER_PREFIX + "enableTaskInSeparateProcess";
+  public static final String TASK_CLASSPATH =
+      GOBBLIN_CLUSTER_PREFIX + "task.classpath";
 
   // General Gobblin Cluster application configuration properties.
   public static final String APPLICATION_NAME_OPTION_NAME = "app_name";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6a31ef84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index dead73b..8816457 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -192,7 +192,7 @@ public class GobblinTaskRunner {
     TaskFactory taskFactory;
     if (isRunTaskInSeparateProcessEnabled) {
       logger.info("Running a task in a separate process is enabled.");
-      taskFactory = new HelixTaskFactory(this.containerMetrics, CLUSTER_CONF_PATH);
+      taskFactory = new HelixTaskFactory(this.containerMetrics, CLUSTER_CONF_PATH, config);
     } else {
       taskFactory = getInProcessTaskFactory();
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6a31ef84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java
index ecb97d5..96ecffc 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Counter;
 import com.google.common.base.Optional;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.util.GobblinProcessBuilder;
 import org.apache.gobblin.util.SystemPropertiesWrapper;
@@ -49,7 +50,7 @@ public class HelixTaskFactory implements TaskFactory {
   private final Optional<Counter> newTasksCounter;
   private final SingleTaskLauncher launcher;
 
-  public HelixTaskFactory(Optional<ContainerMetrics> containerMetrics, Path clusterConfPath) {
+  public HelixTaskFactory(Optional<ContainerMetrics> containerMetrics, Path clusterConfPath, Config sysConfig) {
     this.containerMetrics = containerMetrics;
     if (this.containerMetrics.isPresent()) {
       this.newTasksCounter = Optional
@@ -58,7 +59,7 @@ public class HelixTaskFactory implements TaskFactory {
       this.newTasksCounter = Optional.absent();
     }
     launcher = new SingleTaskLauncher(new GobblinProcessBuilder(), new SystemPropertiesWrapper(),
-        clusterConfPath);
+        clusterConfPath, sysConfig);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6a31ef84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
index 10bad09..1fe3eaf 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
@@ -29,6 +29,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.gobblin.util.GobblinProcessBuilder;
 import org.apache.gobblin.util.SystemPropertiesWrapper;
 
+import com.typesafe.config.Config;
+
 import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.CLUSTER_CONFIG_FILE_PATH;
 import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.JOB_ID;
 import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.WORK_UNIT_FILE_PATH;
@@ -40,12 +42,14 @@ class SingleTaskLauncher {
   private final GobblinProcessBuilder processBuilder;
   private final SystemPropertiesWrapper propertiesWrapper;
   private final Path clusterConfigFilePath;
+  private final Config sysConfig;
 
   SingleTaskLauncher(final GobblinProcessBuilder processBuilder,
-      final SystemPropertiesWrapper propertiesWrapper, final Path clusterConfigFilePath) {
+      final SystemPropertiesWrapper propertiesWrapper, final Path clusterConfigFilePath, Config sysConfig) {
     this.processBuilder = processBuilder;
     this.propertiesWrapper = propertiesWrapper;
     this.clusterConfigFilePath = clusterConfigFilePath;
+    this.sysConfig = sysConfig;
   }
 
   Process launch(final String jobId, final Path workUnitFilePath)
@@ -94,7 +98,12 @@ class SingleTaskLauncher {
 
     private void addClassPath() {
       this.cmd.add("-cp");
-      final String classPath = SingleTaskLauncher.this.propertiesWrapper.getJavaClassPath();
+      String classPath;
+      if (sysConfig.hasPath(GobblinClusterConfigurationKeys.TASK_CLASSPATH)) {
+        classPath = sysConfig.getString(GobblinClusterConfigurationKeys.TASK_CLASSPATH);
+      } else {
+        classPath = SingleTaskLauncher.this.propertiesWrapper.getJavaClassPath();
+      }
       this.cmd.add(classPath);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6a31ef84/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
index a8a361c..afa933d 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
@@ -28,6 +28,8 @@ import org.testng.annotations.Test;
 import org.apache.gobblin.util.GobblinProcessBuilder;
 import org.apache.gobblin.util.SystemPropertiesWrapper;
 
+import com.typesafe.config.ConfigFactory;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -56,7 +58,7 @@ public class SingleTaskLauncherTest {
 
     final Path clusterConfPath = Paths.get(CLUSTER_CONFIG_CONF_PATH);
     final SingleTaskLauncher launcher =
-        new SingleTaskLauncher(processBuilder, propertiesWrapper, clusterConfPath);
+        new SingleTaskLauncher(processBuilder, propertiesWrapper, clusterConfPath, ConfigFactory.empty());
 
     final Path workUnitPath = Paths.get(WORK_UNIT_PATH);
     final Process process = launcher.launch(JOB_ID, workUnitPath);