You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:29 UTC
[11/50] incubator-gobblin git commit: [GOBBLIN-398] Upgrade helix to
0.6.9
[GOBBLIN-398] Upgrade helix to 0.6.9
Closes #2272 from htran1/helix_069
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d29b72f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d29b72f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d29b72f4
Branch: refs/heads/0.12.0
Commit: d29b72f4997b4a435397353bb93a66ac4213d55e
Parents: ff13dde
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Jan 31 17:29:21 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jan 31 17:29:21 2018 -0800
----------------------------------------------------------------------
.../gobblin/cluster/GobblinClusterManager.java | 15 +-
.../cluster/GobblinHelixJobLauncher.java | 72 +-
.../gobblin/cluster/GobblinHelixTaskDriver.java | 296 +-------
.../apache/helix/task/GobblinJobRebalancer.java | 713 -------------------
.../cluster/GobblinHelixJobLauncherTest.java | 22 +-
gradle/scripts/computeVersions.gradle | 2 +-
gradle/scripts/dependencyDefinitions.gradle | 2 +-
7 files changed, 57 insertions(+), 1065 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 6b53c6c..77e511e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -60,8 +59,8 @@ import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
+import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -261,21 +260,17 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
// Clean up existing jobs
TaskDriver taskDriver = new TaskDriver(this.helixManager);
- GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(this.helixManager);
Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows();
for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) {
String queueName = entry.getKey();
WorkflowConfig workflowConfig = entry.getValue();
- for (String namespacedJobName : workflowConfig.getJobDag().getAllNodes()) {
- String jobName = TaskUtil.getDenamespacedJobName(queueName, namespacedJobName);
- LOGGER.info("job {} found for queue {} ", jobName, queueName);
+ // request delete if not already requested
+ if (workflowConfig.getTargetState() != TargetState.DELETE) {
+ taskDriver.delete(queueName);
- // #HELIX-0.6.7-WORKAROUND
- // working around 0.6.7 delete job issue for queues with IN_PROGRESS state
- gobblinHelixTaskDriver.deleteJob(queueName, jobName);
- LOGGER.info("deleted job {} from queue {}", jobName, queueName);
+ LOGGER.info("Requested delete of queue {}", queueName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 1a39dfb..af15469 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -30,17 +30,14 @@ import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
-import org.apache.helix.IdealStateChangeListener;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.task.GobblinJobRebalancer;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,36 +167,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(),
this.eventBus, this.stateStores.getTaskStateStore(), outputTaskStateDir);
-
- if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) {
- // Fix-up Ideal State with a custom rebalancer that will re-balance long-running jobs
- final String clusterName =
- ConfigUtils.getString(jobConfig, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, "");
- final String rebalancerToReplace = "org.apache.helix.task.JobRebalancer";
- final String rebalancerClassDesired = GobblinJobRebalancer.class.getName();
- final String jobResourceName = this.jobResourceName;
-
- if (!clusterName.isEmpty()) {
- this.helixManager.addIdealStateChangeListener(new IdealStateChangeListener() {
- @Override
- public void onIdealStateChange(List<IdealState> list, NotificationContext notificationContext) {
- HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
- for (String resource : helixAdmin.getResourcesInCluster(clusterName)) {
- if (resource.equals(jobResourceName)) {
- IdealState idealState = helixAdmin.getResourceIdealState(clusterName, resource);
- if (idealState != null) {
- String rebalancerClassFound = idealState.getRebalancerClassName();
- if (rebalancerToReplace.equals(rebalancerClassFound)) {
- idealState.setRebalancerClassName(rebalancerClassDesired);
- helixAdmin.setResourceIdealState(clusterName, resource, idealState);
- }
- }
- }
- }
- }
- });
- }
- }
}
@Override
@@ -240,15 +207,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
protected void executeCancellation() {
if (this.jobSubmitted) {
try {
- // #HELIX-0.6.7-WORKAROUND
- // working around helix 0.6.7 job delete issue with custom taskDriver
- LOGGER.info("Cancelling job {} in Helix", this.jobContext.getJobId());
- GobblinHelixTaskDriver taskDriver = new GobblinHelixTaskDriver(this.helixManager);
- taskDriver.deleteJob(this.helixQueueName, this.jobContext.getJobId());
- LOGGER.info("Job {} in cancelled Helix", this.jobContext.getJobId());
-
- taskDriver.deleteWorkflow(this.helixQueueName, this.jobQueueDeleteTimeoutSeconds);
- } catch (InterruptedException | IllegalArgumentException e) {
+ this.helixTaskDriver.delete(this.helixQueueName);
+ } catch (IllegalArgumentException e) {
LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e);
}
}
@@ -293,6 +253,11 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
jobConfigBuilder.setNumConcurrentTasksPerInstance(ConfigUtils.getInt(jobConfig,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
+
+ if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) {
+ jobConfigBuilder.setRebalanceRunningTask(true);
+ }
+
return jobConfigBuilder;
}
@@ -300,18 +265,27 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
* Submit a job to run.
*/
private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception {
+ WorkflowConfig workflowConfig = this.helixTaskDriver.getWorkflowConfig(this.helixManager, this.helixQueueName);
+
+ // If the queue is present, but in delete state then wait for cleanup before recreating the queue
+ if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) {
+ GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(this.helixManager);
+ gobblinHelixTaskDriver.deleteWorkflow(this.helixQueueName, this.jobQueueDeleteTimeoutSeconds);
+ // if we get here then the workflow was successfully deleted
+ workflowConfig = null;
+ }
+
// Create one queue for each job with the job name being the queue name
- if (null == this.helixTaskDriver.getWorkflowConfig(this.helixManager, this.helixQueueName)) {
- JobQueue jobQueue = new JobQueue.Builder(this.helixQueueName).build();
- this.helixTaskDriver.createQueue(jobQueue);
- LOGGER.info("Created job queue {}", this.helixQueueName);
+ if (workflowConfig == null) {
+ JobQueue jobQueue = new JobQueue.Builder(this.helixQueueName).build();
+ this.helixTaskDriver.createQueue(jobQueue);
+ LOGGER.info("Created job queue {}", this.helixQueueName);
} else {
LOGGER.info("Job queue {} already exists", this.helixQueueName);
}
// Put the job into the queue
this.helixTaskDriver.enqueueJob(this.jobContext.getJobName(), this.jobContext.getJobId(), jobConfigBuilder);
-
}
public void launchJob(@Nullable JobListener jobListener)
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
index cedb111..ebe2b52 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
@@ -16,54 +16,23 @@
*/
package org.apache.gobblin.cluster;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.IdealState;
import org.apache.helix.store.HelixPropertyStore;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.task.JobDag;
import org.apache.helix.task.TargetState;
-import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
/**
* #HELIX-0.6.7-WORKAROUND
* Replacement TaskDriver methods to workaround bugs and changes in behavior for the 0.6.7 upgrade
*/
public class GobblinHelixTaskDriver {
- /** For logging */
- private static final Logger LOG = Logger.getLogger(GobblinHelixTaskDriver.class);
-
- private final HelixDataAccessor _accessor;
- private final ConfigAccessor _cfgAccessor;
- private final HelixPropertyStore<ZNRecord> _propertyStore;
- private final HelixAdmin _admin;
- private final String _clusterName;
private final TaskDriver _taskDriver;
public GobblinHelixTaskDriver(HelixManager manager) {
@@ -71,277 +40,24 @@ public class GobblinHelixTaskDriver {
.getConfigAccessor(), manager.getHelixPropertyStore(), manager.getClusterName());
}
- public GobblinHelixTaskDriver(ZkClient client, String clusterName) {
- this(client, new ZkBaseDataAccessor<ZNRecord>(client), clusterName);
- }
-
- public GobblinHelixTaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
- this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
- new ConfigAccessor(client), new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
- PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName), null), clusterName);
- }
-
public GobblinHelixTaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor,
HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
- _admin = admin;
- _accessor = accessor;
- _cfgAccessor = cfgAccessor;
- _propertyStore = propertyStore;
- _clusterName = clusterName;
_taskDriver = new TaskDriver(admin, accessor, cfgAccessor, propertyStore, clusterName);
}
/**
- * Delete a job from an existing named queue,
- * the queue has to be stopped prior to this call
- *
- * @param queueName
- * @param jobName
- */
- public void deleteJob(final String queueName, final String jobName) {
- WorkflowConfig workflowCfg =
- _taskDriver.getWorkflowConfig(queueName);
-
- if (workflowCfg == null) {
- throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
- }
- if (workflowCfg.isTerminable()) {
- throw new IllegalArgumentException(queueName + " is not a queue!");
- }
-
- boolean isRecurringWorkflow =
- (workflowCfg.getScheduleConfig() != null && workflowCfg.getScheduleConfig().isRecurring());
-
- if (isRecurringWorkflow) {
- WorkflowContext wCtx = _taskDriver.getWorkflowContext(queueName);
-
- String lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-
- // delete the current scheduled one
- deleteJobFromScheduledQueue(lastScheduledQueue, jobName, true);
-
- // Remove the job from the original queue template's DAG
- removeJobFromDag(queueName, jobName);
-
- // delete the ideal state and resource config for the template job
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
- _admin.dropResource(_clusterName, namespacedJobName);
-
- // Delete the job template from property store
- String jobPropertyPath =
- Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName);
- _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
- } else {
- deleteJobFromScheduledQueue(queueName, jobName, false);
- }
- }
-
- /**
- * delete a job from a scheduled (non-recurrent) queue.
- *
- * @param queueName
- * @param jobName
- */
- private void deleteJobFromScheduledQueue(final String queueName, final String jobName,
- boolean isRecurrent) {
- WorkflowConfig workflowCfg = _taskDriver.getWorkflowConfig(queueName);
-
- if (workflowCfg == null) {
- // When try to delete recurrent job, it could be either not started or finished. So
- // there may not be a workflow config.
- if (isRecurrent) {
- return;
- } else {
- throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
- }
- }
-
- WorkflowContext wCtx = _taskDriver.getWorkflowContext(queueName);
- if (wCtx != null && wCtx.getWorkflowState() == null) {
- throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!");
- }
-
- // #HELIX-0.6.7-WORKAROUND
- // This check is removed to get the same behavior as 0.6.6-SNAPSHOT until new APIs to support delete are provided
- //String workflowState =
- // (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
- //if (workflowState.equals(TaskState.IN_PROGRESS.name())) {
- // throw new IllegalStateException("Queue " + queueName + " is still in progress!");
- //}
-
- removeJob(queueName, jobName);
- }
-
- private boolean removeJobContext(HelixPropertyStore<ZNRecord> propertyStore,
- String jobResource) {
- return propertyStore.remove(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource),
- AccessOption.PERSISTENT);
- }
-
- private void removeJob(String queueName, String jobName) {
- // Remove the job from the queue in the DAG
- removeJobFromDag(queueName, jobName);
-
- // delete the ideal state and resource config for the job
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
- _admin.dropResource(_clusterName, namespacedJobName);
-
- // update queue's property to remove job from JOB_STATES if it is already started.
- removeJobStateFromQueue(queueName, jobName);
-
- // Delete the job from property store
- removeJobContext(_propertyStore, namespacedJobName);
- }
-
- /** Remove the job name from the DAG from the queue configuration */
- private void removeJobFromDag(final String queueName, final String jobName) {
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-
- DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- if (currentData == null) {
- LOG.error("Could not update DAG for queue: " + queueName + " ZNRecord is null.");
- return null;
- }
- // Add the node to the existing DAG
- JobDag jobDag = JobDag.fromJson(
- currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
- Set<String> allNodes = jobDag.getAllNodes();
- if (!allNodes.contains(namespacedJobName)) {
- LOG.warn(
- "Could not delete job from queue " + queueName + ", job " + jobName + " not exists");
- return currentData;
- }
- String parent = null;
- String child = null;
- // remove the node from the queue
- for (String node : allNodes) {
- if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
- parent = node;
- jobDag.removeParentToChild(parent, namespacedJobName);
- } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
- child = node;
- jobDag.removeParentToChild(namespacedJobName, child);
- }
- }
- if (parent != null && child != null) {
- jobDag.addParentToChild(parent, child);
- }
- jobDag.removeNode(namespacedJobName);
-
- // Save the updated DAG
- try {
- currentData
- .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
- } catch (Exception e) {
- throw new IllegalStateException(
- "Could not remove job " + jobName + " from DAG of queue " + queueName, e);
- }
- return currentData;
- }
- };
-
- String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
- if (!_accessor.getBaseDataAccessor().update(path, dagRemover, AccessOption.PERSISTENT)) {
- throw new IllegalArgumentException(
- "Could not remove job " + jobName + " from DAG of queue " + queueName);
- }
- }
-
- /** update queue's property to remove job from JOB_STATES if it is already started. */
- private void removeJobStateFromQueue(final String queueName, final String jobName) {
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
- String queuePropertyPath =
- Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
-
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- if (currentData != null) {
- Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES);
- if (states != null && states.containsKey(namespacedJobName)) {
- states.keySet().remove(namespacedJobName);
- }
- }
- return currentData;
- }
- };
- if (!_propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT)) {
- LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName);
- }
- }
-
- /**
- * Trigger a controller pipeline execution for a given resource.
- *
- * @param accessor Helix data accessor
- * @param resource the name of the resource changed to triggering the execution
- */
- private void invokeRebalance(HelixDataAccessor accessor, String resource) {
- // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run
- LOG.info("invoke rebalance for " + resource);
- PropertyKey key = accessor.keyBuilder().idealStates(resource);
- IdealState is = accessor.getProperty(key);
- if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
- if (!accessor.updateProperty(key, is)) {
- LOG.warn("Failed to invoke rebalance on resource " + resource);
- }
- } else {
- LOG.warn("Can't find ideal state or ideal state is not for right type for " + resource);
- }
- }
-
- /** Helper function to change target state for a given workflow */
- private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
- LOG.info("Set " + workflowName + " to target state " + state);
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- if (currentData != null) {
- // Only update target state for non-completed workflows
- String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME);
- if (finishTime == null || finishTime.equals(String.valueOf(WorkflowContext.UNFINISHED))) {
- currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
- state.name());
- } else {
- LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime);
- }
- } else {
- LOG.error("TargetState DataUpdater: Fails to update target state ");
- }
- return currentData;
- }
- };
- List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
- List<String> paths = Lists.newArrayList();
-
- PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName);
- if (_accessor.getProperty(cfgKey) != null) {
- paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
- updaters.add(updater);
- _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
- invokeRebalance(_accessor, workflowName);
- } else {
- LOG.error("Configuration path " + cfgKey + " not found!");
- }
- }
-
- /**
* Delete the workflow
*
* @param workflow The workflow name
* @param timeout The timeout for deleting the workflow/queue in seconds
*/
public void deleteWorkflow(String workflow, long timeout) throws InterruptedException {
- // #HELIX-0.6.7-WORKAROUND
- // Helix 0.6.7 has a bug where TaskDriver.delete(workflow) will delete all resources with a
- // workflow as the prefix. Work around the bug by pulling in the code from TaskDriver and calling
- // setSingleWorkflowTargetState directly to bypass the prefix matching code.
- setSingleWorkflowTargetState(workflow, TargetState.DELETE);
+ WorkflowConfig workflowConfig = _taskDriver.getWorkflowConfig(workflow);
+
+ // set the target state if not already set
+ if (workflowConfig != null && workflowConfig.getTargetState() != TargetState.DELETE) {
+ _taskDriver.delete(workflow);
+ }
long endTime = System.currentTimeMillis() + (timeout * 1000);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java b/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java
deleted file mode 100644
index 25241d5..0000000
--- a/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java
+++ /dev/null
@@ -1,713 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.helix.task;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
-/**
- * Custom rebalancer implementation for the {@code Job} in task model.
- */
-public class GobblinJobRebalancer extends TaskRebalancer {
- private static final Logger LOG = Logger.getLogger(GobblinJobRebalancer.class);
- private static TaskAssignmentCalculator fixTaskAssignmentCal =
- new FixedTargetTaskAssignmentCalculator();
- private static TaskAssignmentCalculator genericTaskAssignmentCal =
- new GenericTaskAssignmentCalculator();
-
- private static final String PREV_RA_NODE = "PreviousResourceAssignment";
-
- @Override
- public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
- IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
- final String jobName = resource.getResourceName();
- LOG.debug("Computer Best Partition for job: " + jobName);
-
- // Fetch job configuration
- JobConfig jobCfg = TaskUtil.getJobCfg(_manager, jobName);
- if (jobCfg == null) {
- LOG.error("Job configuration is NULL for " + jobName);
- return buildEmptyAssignment(jobName, currStateOutput);
- }
- String workflowResource = jobCfg.getWorkflow();
-
- // Fetch workflow configuration and context
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
- if (workflowCfg == null) {
- LOG.error("Workflow configuration is NULL for " + jobName);
- return buildEmptyAssignment(jobName, currStateOutput);
- }
-
- WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
- if (workflowCtx == null) {
- LOG.error("Workflow context is NULL for " + jobName);
- return buildEmptyAssignment(jobName, currStateOutput);
- }
-
- TargetState targetState = workflowCfg.getTargetState();
- if (targetState != TargetState.START && targetState != TargetState.STOP) {
- LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource
- + ".Stop scheduling job " + jobName);
- return buildEmptyAssignment(jobName, currStateOutput);
- }
-
- // Stop current run of the job if workflow or job is already in final state (failed or completed)
- TaskState workflowState = workflowCtx.getWorkflowState();
- TaskState jobState = workflowCtx.getJobState(jobName);
- // The job is already in a final state (completed/failed).
- if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED ||
- jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
- LOG.info(String.format(
- "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
- workflowResource, jobName, workflowState, jobState));
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
- _scheduledRebalancer.removeScheduledRebalance(jobName);
- return buildEmptyAssignment(jobName, currStateOutput);
- }
-
- if (!isWorkflowReadyForSchedule(workflowCfg)) {
- LOG.info("Job is not ready to be run since workflow is not ready " + jobName);
- return buildEmptyAssignment(jobName, currStateOutput);
- }
-
- if (!isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg,
- workflowCtx)) {
- LOG.info("Job is not ready to run " + jobName);
- return buildEmptyAssignment(jobName, currStateOutput);
- }
-
- // Fetch any existing context information from the property store.
- JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName);
- if (jobCtx == null) {
- jobCtx = new JobContext(new ZNRecord("TaskContext"));
- jobCtx.setStartTime(System.currentTimeMillis());
- }
-
- // Grab the old assignment, or an empty one if it doesn't exist
- ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName);
- if (prevAssignment == null) {
- prevAssignment = new ResourceAssignment(jobName);
- }
-
- // Will contain the list of partitions that must be explicitly dropped from the ideal state that
- // is stored in zk.
- // Fetch the previous resource assignment from the property store. This is required because of
- // HELIX-230.
- Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
- ? clusterData.getAllEnabledLiveInstances()
- : clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
-
- if (liveInstances.isEmpty()) {
- LOG.error("No available instance found for job!");
- }
-
- Set<Integer> partitionsToDrop = new TreeSet<Integer>();
- ResourceAssignment newAssignment =
- computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, liveInstances,
- currStateOutput, workflowCtx, jobCtx, partitionsToDrop, clusterData);
-
- if (!partitionsToDrop.isEmpty()) {
- for (Integer pId : partitionsToDrop) {
- taskIs.getRecord().getMapFields().remove(pName(jobName, pId));
- }
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- PropertyKey propertyKey = accessor.keyBuilder().idealStates(jobName);
- accessor.setProperty(propertyKey, taskIs);
- }
-
- // Update rebalancer context, previous ideal state.
- TaskUtil.setJobContext(_manager, jobName, jobCtx);
- TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
- setPrevResourceAssignment(jobName, newAssignment);
-
- LOG.debug("Job " + jobName + " new assignment " + Arrays
- .toString(newAssignment.getMappedPartitions().toArray()));
- return newAssignment;
- }
-
- private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
- WorkflowConfig workflowCfg) {
- Set<String> ret = new HashSet<String>();
- for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
- if (jobName.equals(currentJobName)) {
- continue;
- }
- JobContext jobContext = TaskUtil.getJobContext(_manager, jobName);
- if (jobContext == null) {
- continue;
- }
- for (int partition : jobContext.getPartitionSet()) {
- TaskPartitionState partitionState = jobContext.getPartitionState(partition);
- if (partitionState == TaskPartitionState.INIT ||
- partitionState == TaskPartitionState.RUNNING) {
- ret.add(jobContext.getAssignedParticipant(partition));
- }
- }
- }
-
- return ret;
- }
-
- private ResourceAssignment computeResourceMapping(String jobResource,
- WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
- Collection<String> liveInstances, CurrentStateOutput currStateOutput,
- WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
- ClusterDataCache cache) {
- TargetState jobTgtState = workflowConfig.getTargetState();
- // Update running status in workflow context
- if (jobTgtState == TargetState.STOP) {
- workflowCtx.setJobState(jobResource, TaskState.STOPPED);
- // Workflow has been stopped if all in progress jobs are stopped
- if (isWorkflowStopped(workflowCtx, workflowConfig)) {
- workflowCtx.setWorkflowState(TaskState.STOPPED);
- }
- } else {
- workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
- // Workflow is in progress if any task is in progress
- workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
- }
-
- // Used to keep track of tasks that have already been assigned to instances.
- Set<Integer> assignedPartitions = new HashSet<Integer>();
-
- // Used to keep track of tasks that have failed, but whose failure is acceptable
- Set<Integer> skippedPartitions = new HashSet<Integer>();
-
- // Keeps a mapping of (partition) -> (instance, state)
- Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
-
- // Keeps a mapping of (partition) -> (instance, state) of partitions have have been relocated
- Map<Integer, PartitionAssignment> relocatedPaMap = new TreeMap<Integer, PartitionAssignment>();
-
- Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig);
-
- // Process all the current assignments of tasks.
- TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg);
- Set<Integer> allPartitions = taskAssignmentCal
- .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache.getIdealStates());
-
- if (allPartitions == null || allPartitions.isEmpty()) {
- // Empty target partitions, mark the job as FAILED.
- String failureMsg = "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!";
- LOG.info(failureMsg);
- jobCtx.setInfo(failureMsg);
- markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
- markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
- _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.FAILED);
- return new ResourceAssignment(jobResource);
- }
-
- Map<String, SortedSet<Integer>> taskAssignments =
- getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
- long currentTime = System.currentTimeMillis();
-
- LOG.debug("All partitions: " + allPartitions + " taskAssignment: " + taskAssignments
- + " excludedInstances: " + excludedInstances);
-
- for (Map.Entry<String, SortedSet<Integer>> entryInstance : taskAssignments.entrySet()) {
- String instance = entryInstance.getKey();
- if (excludedInstances.contains(instance)) {
- continue;
- }
-
- Set<Integer> pSet = entryInstance.getValue();
- // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
- // TASK_ERROR, ERROR.
- Set<Integer> donePartitions = new TreeSet<Integer>();
- for (int pId : pSet) {
- final String pName = pName(jobResource, pId);
-
- // Check for pending state transitions on this (partition, instance).
- Message pendingMessage =
- currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
- if (pendingMessage != null) {
- // There is a pending state transition for this (partition, instance). Just copy forward
- // the state assignment from the previous ideal state.
- Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
- if (stateMap != null) {
- String prevState = stateMap.get(instance);
- paMap.put(pId, new PartitionAssignment(instance, prevState));
- assignedPartitions.add(pId);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
- pName, instance, prevState));
- }
- }
-
- continue;
- }
-
- TaskPartitionState currState =
- TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
- pName), instance));
- jobCtx.setPartitionState(pId, currState);
-
- String taskMsg = currStateOutput.getInfo(jobResource, new Partition(
- pName), instance);
- if (taskMsg != null) {
- jobCtx.setPartitionInfo(pId, taskMsg);
- }
-
- // Process any requested state transitions.
- String requestedStateStr =
- currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
- if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
- TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
- if (requestedState.equals(currState)) {
- LOG.warn(String.format(
- "Requested state %s is the same as the current state for instance %s.",
- requestedState, instance));
- }
-
- paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
- assignedPartitions.add(pId);
- LOG.debug(String.format(
- "Instance %s requested a state transition to %s for partition %s.", instance,
- requestedState, pName));
- continue;
- }
-
- switch (currState) {
- case RUNNING:
- case STOPPED: {
- TaskPartitionState nextState;
- if (jobTgtState == TargetState.START) {
- nextState = TaskPartitionState.RUNNING;
- } else {
- nextState = TaskPartitionState.STOPPED;
- }
-
- paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
- assignedPartitions.add(pId);
- LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
- nextState, instance));
- }
- break;
- case COMPLETED: {
- // The task has completed on this partition. Mark as such in the context object.
- donePartitions.add(pId);
- LOG.debug(String
- .format(
- "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
- pName, currState));
- partitionsToDropFromIs.add(pId);
- markPartitionCompleted(jobCtx, pId);
- }
- break;
- case TIMED_OUT:
- case TASK_ERROR:
- case TASK_ABORTED:
- case ERROR: {
- donePartitions.add(pId); // The task may be rescheduled on a different instance.
- LOG.debug(String.format(
- "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", pName,
- currState, taskMsg));
- markPartitionError(jobCtx, pId, currState, true);
- // The error policy is to fail the task as soon a single partition fails for a specified
- // maximum number of attempts or task is in ABORTED state.
- if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() ||
- currState.equals(TaskPartitionState.TASK_ABORTED)) {
- // If we have some leeway for how many tasks we can fail, then we don't have
- // to fail the job immediately
- if (skippedPartitions.size() >= jobCfg.getFailureThreshold()) {
- markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
- _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.FAILED);
- markAllPartitionsError(jobCtx, currState, false);
- addAllPartitions(allPartitions, partitionsToDropFromIs);
-
- // remove IdealState of this job
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
- return buildEmptyAssignment(jobResource, currStateOutput);
- } else {
- skippedPartitions.add(pId);
- partitionsToDropFromIs.add(pId);
- }
-
- LOG.debug("skippedPartitions:" + skippedPartitions);
- } else {
- // Mark the task to be started at some later time (if enabled)
- markPartitionDelayed(jobCfg, jobCtx, pId);
- }
- }
- break;
- case INIT:
- case DROPPED: {
- // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
- donePartitions.add(pId);
- LOG.debug(String.format(
- "Task partition %s has state %s. It will be dropped from the current ideal state.",
- pName, currState));
- }
- break;
- default:
- throw new AssertionError("Unknown enum symbol: " + currState);
- }
- }
-
- // Remove the set of task partitions that are completed or in one of the error states.
- pSet.removeAll(donePartitions);
- }
-
- // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
- scheduleForNextTask(jobResource, jobCtx, currentTime);
-
- if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
- markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
- _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED);
- // remove IdealState of this job
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
- }
-
- // Make additional task assignments if needed.
- if (jobTgtState == TargetState.START) {
- // Contains the set of task partitions that must be excluded from consideration when making
- // any new assignments.
- // This includes all completed, failed, delayed, and already assigned partitions.
- //Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
- //HACK: Modify excludeSet to be empty
- Set<Integer> excludeSet = Sets.newTreeSet();
- addCompletedTasks(excludeSet, jobCtx, allPartitions);
- addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
- excludeSet.addAll(skippedPartitions);
- excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
- // Get instance->[partition, ...] mappings for the target resource.
- Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal
- .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
- workflowConfig, workflowCtx, allPartitions, cache.getIdealStates());
- for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
- String instance = entry.getKey();
- if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances
- .contains(instance)) {
- continue;
- }
-
- // Contains the set of task partitions currently assigned to the instance.
- Set<Integer> pSet = entry.getValue();
- int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
- if (numToAssign > 0) {
- List<Integer> nextPartitions =
- getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
- for (Integer pId : nextPartitions) {
- // if partition is not currently assigned to instance then it may have been moved
- if (!pSet.contains(pId)) {
- // look at current assignment to see if task is running on another instance
- for (Map.Entry<String, SortedSet<Integer>> currentEntry : taskAssignments.entrySet()) {
- String currentInstance = currentEntry.getKey();
- Set<Integer> currentpSet = currentEntry.getValue();
-
- // task is being moved, so transition to STOPPED state
- if (!instance.equals(currentInstance) && currentpSet.contains(pId)) {
- relocatedPaMap.put(pId, new PartitionAssignment(currentInstance, TaskPartitionState.STOPPED.name()));
- break;
- }
- }
- }
-
- String pName = pName(jobResource, pId);
- paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
- excludeSet.add(pId);
- jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
- jobCtx.setPartitionStartTime(pId, System.currentTimeMillis());
- LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
- TaskPartitionState.RUNNING, instance));
- }
- }
- }
- }
-
- // Construct a ResourceAssignment object from the map of partition assignments.
- ResourceAssignment ra = new ResourceAssignment(jobResource);
- for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
- PartitionAssignment pa = e.getValue();
-
- if (relocatedPaMap.containsKey(e.getKey())) {
- PartitionAssignment currentPa = relocatedPaMap.get(e.getKey());
-
- ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
- ImmutableMap.of(pa._instance, pa._state, currentPa._instance, currentPa._state));
- } else {
- ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())), ImmutableMap.of(pa._instance, pa._state));
- }
- }
-
- return ra;
- }
-
- private void markJobComplete(String jobName, JobContext jobContext,
- WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
- long currentTime = System.currentTimeMillis();
- workflowContext.setJobState(jobName, TaskState.COMPLETED);
- jobContext.setFinishTime(currentTime);
- if (isWorkflowFinished(workflowContext, workflowConfig)) {
- workflowContext.setFinishTime(currentTime);
- }
- }
-
- private void scheduleForNextTask(String job, JobContext jobCtx, long now) {
- // Clear current entries if they exist and are expired
- long currentTime = now;
- long scheduledTime = _scheduledRebalancer.getRebalanceTime(job);
- if (scheduledTime > 0 && currentTime > scheduledTime) {
- _scheduledRebalancer.removeScheduledRebalance(job);
- }
-
- // Figure out the earliest schedulable time in the future of a non-complete job
- boolean shouldSchedule = false;
- long earliestTime = Long.MAX_VALUE;
- for (int p : jobCtx.getPartitionSet()) {
- long retryTime = jobCtx.getNextRetryTime(p);
- TaskPartitionState state = jobCtx.getPartitionState(p);
- state = (state != null) ? state : TaskPartitionState.INIT;
- Set<TaskPartitionState> errorStates =
- Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
- TaskPartitionState.TIMED_OUT);
- if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
- earliestTime = retryTime;
- shouldSchedule = true;
- }
- }
-
- // If any was found, then schedule it
- if (shouldSchedule) {
- _scheduledRebalancer.scheduleRebalance(_manager, job, earliestTime);
- }
- }
-
- /**
- * Get the last task assignment for a given job
- *
- * @param resourceName the name of the job
- * @return {@link ResourceAssignment} instance, or null if no assignment is available
- */
- private ResourceAssignment getPrevResourceAssignment(String resourceName) {
- ZNRecord r = _manager.getHelixPropertyStore()
- .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
- null, AccessOption.PERSISTENT);
- return r != null ? new ResourceAssignment(r) : null;
- }
-
- /**
- * Set the last task assignment for a given job
- *
- * @param resourceName the name of the job
- * @param ra {@link ResourceAssignment} containing the task assignment
- */
- private void setPrevResourceAssignment(String resourceName,
- ResourceAssignment ra) {
- _manager.getHelixPropertyStore()
- .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
- ra.getRecord(), AccessOption.PERSISTENT);
- }
-
- /**
- * Checks if the job has completed.
- * @param ctx The rebalancer context.
- * @param allPartitions The set of partitions to check.
- * @param skippedPartitions partitions that failed, but whose failure is acceptable
- * @return true if all task partitions have been marked with status
- * {@link TaskPartitionState#COMPLETED} in the rebalancer
- * context, false otherwise.
- */
- private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
- Set<Integer> skippedPartitions, JobConfig cfg) {
- for (Integer pId : allPartitions) {
- TaskPartitionState state = ctx.getPartitionState(pId);
- if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
- && !isTaskGivenup(ctx, cfg, pId)) {
- return false;
- }
- }
- return true;
- }
-
-
- private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
- for (Integer pId : toAdd) {
- destination.add(pId);
- }
- }
-
- private static void addCompletedTasks(Set<Integer> set, JobContext ctx,
- Iterable<Integer> pIds) {
- for (Integer pId : pIds) {
- TaskPartitionState state = ctx.getPartitionState(pId);
- if (state == TaskPartitionState.COMPLETED) {
- set.add(pId);
- }
- }
- }
-
- private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
- TaskPartitionState state = ctx.getPartitionState(pId);
- if ((state != null) && (state.equals(TaskPartitionState.TASK_ABORTED) || state
- .equals(TaskPartitionState.ERROR))) {
- return true;
- }
- return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
- }
-
- // add all partitions that have been tried maxNumberAttempts
- private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
- JobConfig cfg) {
- for (Integer pId : pIds) {
- if (isTaskGivenup(ctx, cfg, pId)) {
- set.add(pId);
- }
- }
- }
-
- private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
- Set<Integer> excluded, int n) {
- List<Integer> result = new ArrayList<Integer>();
- for (Integer pId : candidatePartitions) {
- if (result.size() >= n) {
- break;
- }
-
- if (!excluded.contains(pId)) {
- result.add(pId);
- }
- }
-
- return result;
- }
-
- private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
- long delayInterval = cfg.getTaskRetryDelay();
- if (delayInterval <= 0) {
- return;
- }
- long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
- ctx.setNextRetryTime(p, nextStartTime);
- }
-
- private static void markPartitionCompleted(JobContext ctx, int pId) {
- ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
- ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
- ctx.incrementNumAttempts(pId);
- }
-
- private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
- boolean incrementAttempts) {
- ctx.setPartitionState(pId, state);
- ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
- if (incrementAttempts) {
- ctx.incrementNumAttempts(pId);
- }
- }
-
- private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
- boolean incrementAttempts) {
- for (int pId : ctx.getPartitionSet()) {
- markPartitionError(ctx, pId, state, incrementAttempts);
- }
- }
-
- /**
- * Return the assignment of task partitions per instance.
- */
- private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
- Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
- Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
- for (String instance : instanceList) {
- result.put(instance, new TreeSet<Integer>());
- }
-
- for (Partition partition : assignment.getMappedPartitions()) {
- int pId = TaskUtil.getPartitionId(partition.getPartitionName());
- if (includeSet.contains(pId)) {
- Map<String, String> replicaMap = assignment.getReplicaMap(partition);
- for (String instance : replicaMap.keySet()) {
- SortedSet<Integer> pList = result.get(instance);
- if (pList != null) {
- pList.add(pId);
- }
- }
- }
- }
- return result;
- }
-
- private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
- Set<Integer> nonReadyPartitions = Sets.newHashSet();
- for (int p : ctx.getPartitionSet()) {
- long toStart = ctx.getNextRetryTime(p);
- if (now < toStart) {
- nonReadyPartitions.add(p);
- }
- }
- return nonReadyPartitions;
- }
-
- private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) {
- Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
- if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
- return genericTaskAssignmentCal;
- } else {
- return fixTaskAssignmentCal;
- }
- }
-
- /**
- * Computes the partition name given the resource name and partition id.
- */
- private String pName(String resource, int pId) {
- return resource + "_" + pId;
- }
-
- /**
- * An (instance, state) pair.
- */
- private static class PartitionAssignment {
- private final String _instance;
- private final String _state;
-
- private PartitionAssignment(String instance, String state) {
- _instance = instance;
- _state = state;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index aaf5f05..10ef3db 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -307,6 +307,9 @@ public class GobblinHelixJobLauncherTest {
gobblinHelixJobLauncher.close();
+ // job queue deleted asynchronously after close
+ waitForQueueCleanup(taskDriver, jobName);
+
jobContext = taskDriver.getJobContext(jobContextName);
// job context should have been deleted
@@ -325,7 +328,9 @@ public class GobblinHelixJobLauncherTest {
gobblinHelixJobLauncher2.close();
- // job queue deleted after close
+ // job queue deleted asynchronously after close
+ waitForQueueCleanup(taskDriver, jobName2);
+
workflowConfig = taskDriver.getWorkflowConfig(jobName2);
Assert.assertNull(workflowConfig);
@@ -358,4 +363,19 @@ public class GobblinHelixJobLauncherTest {
this.closer.close();
}
}
+
+ private void waitForQueueCleanup(TaskDriver taskDriver, String queueName) {
+ for (int i = 0; i < 60; i++) {
+ WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(queueName);
+
+ if (workflowConfig == null) {
+ break;
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gradle/scripts/computeVersions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/computeVersions.gradle b/gradle/scripts/computeVersions.gradle
index 2c05350..cf7baaa 100644
--- a/gradle/scripts/computeVersions.gradle
+++ b/gradle/scripts/computeVersions.gradle
@@ -62,7 +62,7 @@ ext.gradleVersionMajor = Integer.parseInt(gradleVersions[0])
ext.gradleVersionMinor = Integer.parseInt(gradleVersions[1])
println "Detected Gradle version major=" + gradleVersionMajor + " minor=" + gradleVersionMinor
-ext.dropwizardMetricsVersion = '3.1.0'
+ext.dropwizardMetricsVersion = '3.2.3'
ext.findBugsVersion = '3.0.0'
ext.googleVersion = '1.22.0'
ext.slf4jVersion = '1.7.21'
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index d138a55..ad1c1cf 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -61,7 +61,7 @@ ext.externalDependency = [
"hadoopYarnMiniCluster": "org.apache.hadoop:hadoop-minicluster:" + hadoopVersion,
"hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" + hadoopVersion,
"hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
- "helix": "org.apache.helix:helix-core:0.6.7",
+ "helix": "org.apache.helix:helix-core:0.6.9",
"hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
"hiveService": "org.apache.hive:hive-service:" + hiveVersion,
"hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,