You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/14 00:36:56 UTC
incubator-gobblin git commit: [GOBBLIN-535] Add second hop for
distributed job launcher
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 6232b416a -> 45fc9dfca
[GOBBLIN-535] Add second hop for distributed job launcher
Closes #2398 from yukuai518/hop2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/45fc9dfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/45fc9dfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/45fc9dfc
Branch: refs/heads/master
Commit: 45fc9dfca69dd8479504ee6b9878d196c722a329
Parents: 6232b41
Author: Kuai Yu <ku...@linkedin.com>
Authored: Fri Jul 13 17:36:51 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Jul 13 17:36:51 2018 -0700
----------------------------------------------------------------------
...blinHelixDistributeJobExecutionLauncher.java | 20 +++++-
.../gobblin/cluster/GobblinHelixJobFactory.java | 17 +++--
.../cluster/GobblinHelixJobLauncher.java | 10 ++-
.../gobblin/cluster/GobblinHelixJobTask.java | 74 ++++++++++++++------
.../gobblin/cluster/GobblinHelixTask.java | 13 ++--
.../gobblin/cluster/GobblinTaskRunner.java | 12 ++--
.../cluster/HelixRetriggeringJobCallable.java | 52 +++++++++++---
.../org/apache/gobblin/cluster/HelixUtils.java | 1 +
.../gobblin/cluster/TaskRunnerSuiteBase.java | 24 +++++++
.../cluster/TaskRunnerSuiteThreadModel.java | 4 +-
.../TaskRunnerSuiteForJobFactoryTest.java | 10 +--
.../suite/IntegrationJobFactorySuite.java | 3 +-
.../gobblin/runtime/api/ExecutionResult.java | 2 +-
13 files changed, 184 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 87613a6..7ef24fc 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.cluster;
+import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.List;
@@ -85,7 +86,7 @@ import org.apache.gobblin.util.PropertiesUtils;
*/
@Alpha
@Slf4j
-class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher {
+class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher, Closeable {
protected HelixManager helixManager;
protected TaskDriver helixTaskDriver;
protected Properties sysProperties;
@@ -100,12 +101,14 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
private final long jobQueueDeleteTimeoutSeconds;
+ private boolean jobSubmitted;
+
public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception {
this.helixManager = builder.manager;
this.helixTaskDriver = new TaskDriver(this.helixManager);
this.sysProperties = builder.sysProperties;
this.jobProperties = builder.jobProperties;
-
+ this.jobSubmitted = false;
Config combined = ConfigUtils.propertiesToConfig(jobProperties)
.withFallback(ConfigUtils.propertiesToConfig(sysProperties));
@@ -124,6 +127,17 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
}
+ @Override
+ public void close()
+ throws IOException {
+ // we should delete the planning job at the end.
+ if (this.jobSubmitted) {
+ String planningName = getPlanningJobName(this.jobProperties);
+ log.info("[DELETE] workflow {} in the close.", planningName);
+ this.helixTaskDriver.delete(planningName);
+ }
+ }
+
@Setter
public static class Builder {
Properties sysProperties;
@@ -191,6 +205,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
taskDriver,
this.helixManager,
this.jobQueueDeleteTimeoutSeconds);
+ this.jobSubmitted = true;
}
@Override
@@ -232,6 +247,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
return getResultFromUserContent();
} catch (TimeoutException te) {
helixTaskDriver.waitToStop(planningName, 10L);
+ log.info("[DELETE] workflow {} timeout.", planningName);
this.helixTaskDriver.delete(planningName);
this.helixTaskDriver.resume(planningName);
log.info("stopped the queue, deleted the job");
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
index 2f7ced2..83821d4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
@@ -39,11 +39,12 @@ import org.apache.gobblin.util.PathUtils;
*/
@Slf4j
public class GobblinHelixJobFactory implements TaskFactory {
- protected Config sysConfig;
protected StateStores stateStores;
- public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder) {
- this.sysConfig = builder.getConfig();
+ protected TaskRunnerSuiteBase.Builder builder;
+
+ private void initializeStateStore(TaskRunnerSuiteBase.Builder builder) {
+ Config sysConfig = builder.getConfig();
Path appWorkDir = builder.getAppWorkPath();
URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri();
Config stateStoreJobConfig = sysConfig
@@ -56,8 +57,16 @@ public class GobblinHelixJobFactory implements TaskFactory {
appWorkDir, GobblinHelixDistributeJobExecutionLauncher.PLANNING_JOB_STATE_DIR_NAME);
}
+ public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder) {
+ this.builder = builder;
+ // TODO: We can remove below initialization once Helix allow us to persist job resut in userContentStore
+ initializeStateStore(this.builder);
+ }
+
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new GobblinHelixJobTask(context, this.sysConfig, stateStores);
+ return new GobblinHelixJobTask(context,
+ this.stateStores,
+ this.builder);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 39c6e5b..8794a34 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -45,6 +45,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -96,6 +97,7 @@ import org.apache.gobblin.util.SerializationUtils;
* @author Yinan Li
*/
@Alpha
+@Slf4j
public class GobblinHelixJobLauncher extends AbstractJobLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobLauncher.class);
@@ -136,8 +138,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
this.runningMap = runningMap;
this.appWorkDir = appWorkDir;
this.inputWorkUnitDir = new Path(appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
- this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME +
- Path.SEPARATOR + this.jobContext.getJobId());
+ this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME
+ + Path.SEPARATOR + this.jobContext.getJobId());
this.helixQueueName = this.jobContext.getJobName();
this.jobResourceName = TaskUtil.getNamespacedJobName(this.helixQueueName, this.jobContext.getJobId());
@@ -149,7 +151,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
jobConfig = ConfigUtils.propertiesToConfig(jobProps);
- this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig, GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
+ this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig,
+ GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
@@ -211,6 +214,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
protected void executeCancellation() {
if (this.jobSubmitted) {
try {
+ log.info("[DELETE] workflow {}", this.helixQueueName);
this.helixTaskDriver.delete(this.helixQueueName);
} catch (IllegalArgumentException e) {
LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index f60852e..9ede090 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -18,9 +18,14 @@
package org.apache.gobblin.cluster;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixManager;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
@@ -28,61 +33,85 @@ import org.apache.helix.task.TaskResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Closer;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.extractor.partition.Partitioner;
import org.apache.gobblin.util.ConfigUtils;
/**
- * An implementation of Helix's {@link org.apache.helix.task.Task} that runs original {@link GobblinHelixJobLauncher}
+ * An implementation of Helix's {@link org.apache.helix.task.Task} that runs original {@link GobblinHelixJobLauncher}.
*/
@Slf4j
public class GobblinHelixJobTask implements Task {
private final TaskConfig taskConfig;
- private Config sysConfig;
- private Properties jobConfig;
- private StateStores stateStores;
- private String planningJobId;
-
+ private final Config sysConfig;
+ private final Properties jobPlusSysConfig;
+ private final StateStores stateStores;
+ private final String planningJobId;
+ private final HelixManager helixManager;
+ private final Path appWorkDir;
+ private final List<? extends Tag<?>> metadataTags;
+
+ private GobblinHelixJobLauncher launcher;
public GobblinHelixJobTask(TaskCallbackContext context,
- Config sysConfig,
- StateStores stateStores) {
+ StateStores stateStores,
+ TaskRunnerSuiteBase.Builder builder) {
this.taskConfig = context.getTaskConfig();
- this.sysConfig = sysConfig;
- this.jobConfig = ConfigUtils.configToProperties(sysConfig);
+ this.sysConfig = builder.getConfig();
+ this.helixManager = builder.getHelixManager();
+ this.jobPlusSysConfig = ConfigUtils.configToProperties(sysConfig);
Map<String, String> configMap = this.taskConfig.getConfigMap();
for (Map.Entry<String, String> entry: configMap.entrySet()) {
if (entry.getKey().startsWith(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX)) {
String key = entry.getKey().replaceFirst(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX, "");
- jobConfig.put(key, entry.getValue());
+ jobPlusSysConfig.put(key, entry.getValue());
}
}
- if (!jobConfig.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) {
+ if (!jobPlusSysConfig.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) {
throw new RuntimeException("Job doesn't have plannning ID");
}
- this.planningJobId = jobConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
+ this.planningJobId = jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
this.stateStores = stateStores;
+ this.appWorkDir = builder.getAppWorkPath();
+ this.metadataTags = Tag.fromMap(new ImmutableMap.Builder<String, Object>()
+ .put(GobblinClusterMetricTagNames.APPLICATION_NAME, builder.getApplicationName())
+ .put(GobblinClusterMetricTagNames.APPLICATION_ID, builder.getApplicationId())
+ .build());
}
- @Override
- public TaskResult run() {
- log.info("We will run planning job " + this.planningJobId);
- // TODO: We should run GobblinHelixJobLauncher#launchJob() here
+ private GobblinHelixJobLauncher createJobLauncher()
+ throws Exception {
+ return new GobblinHelixJobLauncher(jobPlusSysConfig,
+ this.helixManager,
+ this.appWorkDir,
+ this.metadataTags,
+ new ConcurrentHashMap<>());
+ }
- try {
+ @Override
+ public TaskResult run() {
+ log.info("Running planning job {}", this.planningJobId);
+ // Launch the job
+ try (Closer closer = Closer.create()) {
+ this.launcher = createJobLauncher();
+ //TODO: we will provide additional listener
+ closer.register(launcher).launchJob(null);
setResultToUserContent(ImmutableMap.of(Partitioner.IS_EARLY_STOPPED, "false"));
- } catch (IOException e) {
- return new TaskResult(TaskResult.Status.FAILED, "State store cannot be persisted for job " + planningJobId);
+ } catch (Exception e) {
+ return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils
+ .getFullStackTrace(e));
}
return new TaskResult(TaskResult.Status.COMPLETED, "");
}
@@ -102,6 +131,9 @@ public class GobblinHelixJobTask implements Task {
@Override
public void cancel() {
- // TODO: We should delete the real job.
+ log.info("Cancelling planning job {}", this.planningJobId);
+ if (launcher != null) {
+ launcher.executeCancellation();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index e651b8e..a2516eb 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -26,13 +26,13 @@ import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import com.google.common.base.Throwables;
import com.google.common.io.Closer;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.TaskState;
@@ -58,14 +58,14 @@ import org.apache.gobblin.util.Id;
* </p>
*/
@Alpha
+@Slf4j
public class GobblinHelixTask implements Task {
- private static final Logger _logger = LoggerFactory.getLogger(GobblinHelixTask.class);
-
private final TaskConfig taskConfig;
private String jobName;
private String jobId;
private String jobKey;
+ private String taskId;
private Path workUnitFilePath;
private SingleTask task;
@@ -90,22 +90,25 @@ public class GobblinHelixTask implements Task {
this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY);
this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY);
this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
+ this.taskId = configMap.get(ConfigurationKeys.TASK_ID_KEY);
this.workUnitFilePath =
new Path(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH));
}
@Override
public TaskResult run() {
+ log.info("Actual task {} started.", this.taskId);
try (Closer closer = Closer.create()) {
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, this.jobName));
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, this.jobKey));
this.task.run();
+ log.info("Actual task {} finished.", this.taskId);
return new TaskResult(TaskResult.Status.COMPLETED, "");
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return new TaskResult(TaskResult.Status.CANCELED, "");
} catch (Throwable t) {
- _logger.error("GobblinHelixTask failed due to " + t.getMessage(), t);
+ log.error("GobblinHelixTask " + taskId + " failed due to " + t.getMessage(), t);
return new TaskResult(TaskResult.Status.FAILED, Throwables.getStackTraceAsString(t));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index fb7136d..9d0e6cc 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -89,9 +89,10 @@ import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
* {@link org.apache.gobblin.source.workunit.WorkUnit}s.
*
* <p>
- * This class serves as a Helix participant and it uses a {@link HelixManager} to work with Helix.
- * This class also uses the Helix task execution framework and {@link GobblinHelixTaskFactory} class
- * for creating {@link GobblinHelixTask}s that Helix manages to run Gobblin data ingestion tasks.
+ * This class presents a Helix participant and uses a {@link HelixManager} to communicate with Helix.
+ * It also uses Helix task execution framework and {@link GobblinHelixTaskFactory} class to generate
+ * {@link GobblinHelixTask}s which handles real Gobblin tasks. All the Helix related task framework is
+ * encapsulated in {@link TaskRunnerSuiteBase}.
* </p>
*
* <p>
@@ -174,7 +175,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath)
.setContainerMetrics(this.containerMetrics)
.setFileSystem(this.fs)
- .setHelixManager(this.helixManager).build();
+ .setHelixManager(this.helixManager)
+ .setApplicationId(applicationId)
+ .setApplicationName(applicationName)
+ .build();
this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap());
this.metrics = suite.getTaskMetrics();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index ce3619b..7b9fd3c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -23,6 +23,8 @@ import java.util.concurrent.Callable;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
+import com.google.common.io.Closer;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
@@ -38,7 +40,31 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
- * A {@link Callable} that runs {@link JobLauncher} multiple times iff re-triggering is enabled and job stops early.
+ * A {@link Callable} that can run a given job multiple times iff:
+ * 1) Re-triggering is enabled and
+ * 2) Job stops early.
+ *
+ * Moreover based on the job properties, a job can be processed immediately (non-distributed) or forwarded to a remote
+ * node (distributed) for handling. Details are illustrated as follows:
+ *
+ * <p>
+ * If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is false, the job will be handled
+ * by {@link HelixRetriggeringJobCallable#launchJobLauncherLoop()}, which simply submits the job to Helix for execution.
+ *
+ * See {@link GobblinHelixJobLauncher} for job launcher details.
+ * </p>
+ *
+ * <p>
+ * If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is true, the job will be handled
+ * by {@link HelixRetriggeringJobCallable#launchJobExecutionLauncherLoop()}}. It will first create a planning job with
+ * {@link GobblinTaskRunner#GOBBLIN_JOB_FACTORY_NAME} pre-configured, so that Helix can forward this planning job to
+ * any nodes that has implemented the Helix task factory model matching the same name. See {@link TaskRunnerSuiteThreadModel}
+ * implementation of how task factory model is setup.
+ *
+ * Once the planning job reaches to the remote end, it will be handled by {@link GobblinHelixJobTask} which is
+ * created by {@link GobblinHelixJobTask}. The actual handling is similar to the non-distributed mode, where
+ * {@link GobblinHelixJobLauncher} is invoked.
+ * </p>
*/
@Slf4j
@Alpha
@@ -123,16 +149,22 @@ class HelixRetriggeringJobCallable implements Callable {
builder.setManager(this.helixManager);
builder.setAppWorkDir(this.appWorkDir);
- this.currentJobMonitor = builder.build().launchJob(null);
- ExecutionResult result = this.currentJobMonitor.get();
- boolean isEarlyStopped = ((GobblinHelixDistributeJobExecutionLauncher.DistributeJobResult) result).isEarlyStopped();
- boolean isRetriggerEnabled = this.isRetriggeringEnabled();
- if (isEarlyStopped && isRetriggerEnabled) {
- log.info("DistributeJob {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
- } else {
- break;
+ try (Closer closer = Closer.create()) {
+ GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
+ closer.register(launcher);
+ this.currentJobMonitor = launcher.launchJob(null);
+ ExecutionResult result = this.currentJobMonitor.get();
+ boolean isEarlyStopped = ((GobblinHelixDistributeJobExecutionLauncher.DistributeJobResult) result).isEarlyStopped();
+ boolean isRetriggerEnabled = this.isRetriggeringEnabled();
+ if (isEarlyStopped && isRetriggerEnabled) {
+ log.info("DistributeJob {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ } else {
+ break;
+ }
+ currentJobMonitor = null;
+ } catch (Throwable t) {
+ throw new JobException("Failed to launch and run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t);
}
- currentJobMonitor = null;
}
} catch (Exception e) {
log.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index ea4e6f7..8b9d5af 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -99,6 +99,7 @@ public class HelixUtils {
WorkflowConfig workflowConfig = helixTaskDriver.getWorkflowConfig(helixManager, queueName);
+ log.info("[DELETE] workflow {} in the beginning", queueName);
// If the queue is present, but in delete state then wait for cleanup before recreating the queue
if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) {
new TaskDriver(helixManager).deleteAndWaitForCompletion(queueName, jobQueueDeleteTimeoutSeconds);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index 03d4d42..e65b968 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -53,11 +53,15 @@ public abstract class TaskRunnerSuiteBase {
protected TaskFactory taskFactory;
protected TaskFactory jobFactory;
protected MetricContext metricContext;
+ protected String applicationId;
+ protected String applicationName;
protected StandardMetricsBridge.StandardMetrics taskMetrics;
protected List<Service> services = Lists.newArrayList();
protected TaskRunnerSuiteBase(Builder builder) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(builder.config), this.getClass());
+ this.applicationId = builder.getApplicationId();
+ this.applicationName = builder.getApplicationName();
}
protected MetricContext getMetricContext() {
@@ -70,6 +74,14 @@ public abstract class TaskRunnerSuiteBase {
protected abstract List<Service> getServices();
+ protected String getApplicationId() {
+ return this.applicationId;
+ }
+
+ protected String getApplicationName() {
+ return this.applicationName;
+ }
+
@Getter
public static class Builder {
private Config config;
@@ -77,6 +89,8 @@ public abstract class TaskRunnerSuiteBase {
private Optional<ContainerMetrics> containerMetrics;
private FileSystem fs;
private Path appWorkPath;
+ private String applicationId;
+ private String applicationName;
public Builder(Config config) {
this.config = config;
@@ -87,6 +101,16 @@ public abstract class TaskRunnerSuiteBase {
return this;
}
+ public Builder setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ return this;
+ }
+
+ public Builder setApplicationId(String applicationId) {
+ this.applicationId = applicationId;
+ return this;
+ }
+
public Builder setContainerMetrics(Optional<ContainerMetrics> containerMetrics) {
this.containerMetrics = containerMetrics;
return this;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
index 4f3a1e0..cddf519 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
@@ -47,7 +47,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
TaskRunnerSuiteThreadModel(TaskRunnerSuiteBase.Builder builder) {
super(builder);
this.taskExecutor = new TaskExecutor(ConfigUtils.configToProperties(builder.getConfig()));
- this.taskFactory = getInProcessTaskFactory(taskExecutor, builder);
+ this.taskFactory = generateTaskFactory(taskExecutor, builder);
this.jobFactory = new GobblinHelixJobFactory(builder);
this.taskMetrics = new GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, metricContext);
}
@@ -70,7 +70,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
return this.services;
}
- private TaskFactory getInProcessTaskFactory(TaskExecutor taskExecutor, Builder builder) {
+ private TaskFactory generateTaskFactory(TaskExecutor taskExecutor, Builder builder) {
Properties properties = ConfigUtils.configToProperties(builder.getConfig());
URI rootPathUri = PathUtils.getRootPath(builder.getAppWorkPath()).toUri();
Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
index 1fdcda5..476747d 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
@@ -27,7 +27,6 @@ import org.apache.helix.task.TaskFactory;
import org.testng.Assert;
import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -56,19 +55,20 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel
public class TestJobFactory extends GobblinHelixJobFactory {
public TestJobFactory(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder) {
super (builder);
+ this.builder = builder;
}
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new TestHelixJobTask(context, this.sysConfig, stateStores);
+ return new TestHelixJobTask(context, stateStores, builder);
}
}
public class TestHelixJobTask extends GobblinHelixJobTask {
public TestHelixJobTask(TaskCallbackContext context,
- Config sysConfig,
- StateStores stateStores) {
- super(context, sysConfig, stateStores);
+ StateStores stateStores,
+ TaskRunnerSuiteBase.Builder builder) {
+ super(context, stateStores, builder);
}
//TODO: change below to Helix UserConentStore
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
index 0487c9c..5166395 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
@@ -43,7 +43,8 @@ public class IntegrationJobFactorySuite extends IntegrationBasicSuite {
protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, true,
- GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, "TestDistributedExecutionLauncherBuilder"));
+ GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, "TestDistributedExecutionLauncherBuilder"))
+ .withFallback(rawJobConfig);
return ImmutableMap.of("HelloWorldJob", newConfig);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
index 3f77493..3b990f9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java
@@ -18,7 +18,7 @@
package org.apache.gobblin.runtime.api;
/**
- * An object which describes the result after job completion. This can be retrieved by {@link JobExecutionFuture#get()}
+ * An object which describes the result after job completion. This can be retrieved by {@link JobExecutionMonitor#get()}
*
* @see JobExecutionResult as a derived class.
*/