You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/09 18:58:34 UTC
[2/4] helix git commit: Support configurable job purge interval for a
queue.
Support configurable job purge interval for a queue.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0aeb5579
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0aeb5579
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0aeb5579
Branch: refs/heads/master
Commit: 0aeb5579d4f7130d6a4310d2d817e0153620cae0
Parents: e530bf5
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Feb 7 14:59:10 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Oct 6 12:23:47 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskDriver.java | 8 +-
.../org/apache/helix/task/TaskRebalancer.java | 1 -
.../java/org/apache/helix/task/TaskUtil.java | 13 +--
.../java/org/apache/helix/task/Workflow.java | 11 ++
.../org/apache/helix/task/WorkflowConfig.java | 68 ++++++++++-
.../org/apache/helix/task/WorkflowContext.java | 75 ++++++++++--
.../apache/helix/task/WorkflowRebalancer.java | 114 +++++++++++--------
.../integration/TestDelayedAutoRebalance.java | 29 +++--
.../integration/task/TestJobQueueCleanUp.java | 49 ++++++++
9 files changed, 281 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 15c906a..f21c005 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -48,10 +47,6 @@ import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.log4j.Logger;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
/**
* CLI for scheduling/canceling workflows
*/
@@ -117,8 +112,7 @@ public class TaskDriver {
flow.validate();
WorkflowConfig newWorkflowConfig =
- new WorkflowConfig.Builder().setConfigMap(flow.getResourceConfigMap())
- .setWorkflowId(flow.getName()).build();
+ new WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build();
Map<String, String> jobTypes = new HashMap<String, String>();
// add all job configs.
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 20a9233..5dbb2a1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -43,7 +43,6 @@ import com.google.common.collect.Maps;
* Abstract rebalancer class for the {@code Task} state model.
*/
public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
- public static final String START_TIME_KEY = "StartTime";
private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
protected static long JOB_PURGE_INTERVAL = 10 * 60 * 1000;
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index f064bbf..42d252e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -36,17 +36,14 @@ import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
/**
* Static utility methods.
@@ -679,7 +676,7 @@ public class TaskUtil {
/* remove IS/EV, config and context of a job */
// Jobname is here should be NamespacedJobName.
- private static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
+ protected static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
String job) {
boolean success = true;
if (!cleanupJobIdealStateExtView(accessor, job)) {
@@ -688,7 +685,7 @@ public class TaskUtil {
job));
success = false;
}
- if (!removeJobConfig(accessor, job)) {
+ if (!removeWorkflowJobConfig(accessor, job)) {
LOG.warn(String.format("Error occurred while trying to remove job config for %s.", job));
success = false;
}
@@ -702,7 +699,8 @@ public class TaskUtil {
/** Remove the job name from the DAG from the queue configuration */
// Job name should be namespaced job name here.
- private static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
+
+ protected static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
final Set<String> jobsToRemove, final boolean maintainDependency) {
// Now atomically clear the DAG
DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
@@ -741,7 +739,7 @@ public class TaskUtil {
/**
* update workflow's property to remove jobs from JOB_STATES if there are already started.
*/
- private static boolean removeJobsState(final HelixPropertyStore propertyStore,
+ protected static boolean removeJobsState(final HelixPropertyStore propertyStore,
final String workflow, final Set<String> jobs) {
String contextPath =
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE);
@@ -751,6 +749,7 @@ public class TaskUtil {
if (currentData != null) {
WorkflowContext workflowContext = new WorkflowContext(currentData);
workflowContext.removeJobStates(jobs);
+ workflowContext.removeJobStartTime(jobs);
currentData = workflowContext.getRecord();
}
return currentData;
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index a7060c3..74c325e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -297,11 +297,22 @@ public class Workflow {
return this;
}
+ /**
+ * Set the config via an existing workflowConfig.
+ * BE CAUTION!: All the previous settings will be override by setting here.
+ *
+ * @param workflowConfig
+ * @return
+ */
public Builder setWorkflowConfig(WorkflowConfig workflowConfig) {
_workflowConfigBuilder = new WorkflowConfig.Builder(workflowConfig);
return this;
}
+ public WorkflowConfig getWorkflowConfig() {
+ return _workflowConfigBuilder.build();
+ }
+
public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
if (_workflowConfigBuilder == null) {
_workflowConfigBuilder = new WorkflowConfig.Builder();
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 80b5973..cc0fdce 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -63,7 +63,8 @@ public class WorkflowConfig extends ResourceConfig {
JobTypes,
IsJobQueue,
/* Allow multiple jobs in this workflow to be assigned to a same instance or not */
- AllowOverlapJobAssignment
+ AllowOverlapJobAssignment,
+ JobPurgeInterval
}
/* Default values */
@@ -77,6 +78,7 @@ public class WorkflowConfig extends ResourceConfig {
public static final boolean DEFAULT_JOB_QUEUE = false;
public static final boolean DEFAULT_MONITOR_DISABLE = true;
public static final boolean DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT = false;
+ protected static final long DEFAULT_JOB_PURGE_INTERVAL = 30 * 60 * 1000; //default 30 minutes
public WorkflowConfig(HelixProperty property) {
super(property.getRecord());
@@ -85,7 +87,7 @@ public class WorkflowConfig extends ResourceConfig {
public WorkflowConfig(WorkflowConfig cfg, String workflowId) {
this(workflowId, cfg.getJobDag(), cfg.getParallelJobs(), cfg.getTargetState(), cfg.getExpiry(),
cfg.getFailureThreshold(), cfg.isTerminable(), cfg.getScheduleConfig(), cfg.getCapacity(),
- cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.isAllowOverlapJobAssignment());
+ cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.isAllowOverlapJobAssignment(),cfg.getJobPurgeInterval());
}
/* Member variables */
@@ -94,7 +96,7 @@ public class WorkflowConfig extends ResourceConfig {
protected WorkflowConfig(String workflowId, JobDag jobDag, int parallelJobs,
TargetState targetState, long expiry, int failureThreshold, boolean terminable,
ScheduleConfig scheduleConfig, int capacity, String workflowType, boolean isJobQueue,
- Map<String, String> jobTypes, boolean allowOverlapJobAssignment) {
+ Map<String, String> jobTypes, boolean allowOverlapJobAssignment, long purgeInterval) {
super(workflowId);
putSimpleConfig(WorkflowConfigProperty.WorkflowID.name(), workflowId);
@@ -110,6 +112,7 @@ public class WorkflowConfig extends ResourceConfig {
putSimpleConfig(WorkflowConfigProperty.IsJobQueue.name(), String.valueOf(isJobQueue));
putSimpleConfig(WorkflowConfigProperty.FailureThreshold.name(), String.valueOf(failureThreshold));
putSimpleConfig(WorkflowConfigProperty.AllowOverlapJobAssignment.name(), String.valueOf(allowOverlapJobAssignment));
+ putSimpleConfig(WorkflowConfigProperty.JobPurgeInterval.name(), String.valueOf(purgeInterval));
if (capacity > 0) {
putSimpleConfig(WorkflowConfigProperty.capacity.name(), String.valueOf(capacity));
@@ -171,6 +174,11 @@ public class WorkflowConfig extends ResourceConfig {
return _record.getLongField(WorkflowConfigProperty.Expiry.name(), DEFAULT_EXPIRY);
}
+ public long getJobPurgeInterval() {
+ return _record
+ .getLongField(WorkflowConfigProperty.JobPurgeInterval.name(), DEFAULT_JOB_PURGE_INTERVAL);
+ }
+
/**
* This Failure threshold only works for generic workflow. Will be ignored by JobQueue
* @return
@@ -312,13 +320,14 @@ public class WorkflowConfig extends ResourceConfig {
private boolean _isJobQueue = DEFAULT_JOB_QUEUE;
private Map<String, String> _jobTypes;
private boolean _allowOverlapJobAssignment = DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT;
+ private long _jobPurgeInterval = DEFAULT_JOB_PURGE_INTERVAL;
public WorkflowConfig build() {
validate();
return new WorkflowConfig(_workflowId, _taskDag, _parallelJobs, _targetState, _expiry,
_failureThreshold, _isTerminable, _scheduleConfig, _capacity, _workflowType, _isJobQueue,
- _jobTypes, _allowOverlapJobAssignment);
+ _jobTypes, _allowOverlapJobAssignment, _jobPurgeInterval);
}
public Builder() {}
@@ -337,6 +346,7 @@ public class WorkflowConfig extends ResourceConfig {
_isJobQueue = workflowConfig.isJobQueue();
_jobTypes = workflowConfig.getJobTypes();
_allowOverlapJobAssignment = workflowConfig.isAllowOverlapJobAssignment();
+ _jobPurgeInterval = workflowConfig.getJobPurgeInterval();
}
public Builder setWorkflowId(String v) {
@@ -359,16 +369,53 @@ public class WorkflowConfig extends ResourceConfig {
return this;
}
+ /**
+ * The expiry time for this workflow. Helix may clean up the workflow information after the
+ * expiry time from the completion of the workflow.
+ *
+ * @param v
+ * @param unit
+ *
+ * @return
+ */
public Builder setExpiry(long v, TimeUnit unit) {
_expiry = unit.toMillis(v);
return this;
}
+ /**
+ * The expiry time for this workflow. Helix may clean up the workflow information after the
+ * expiry time from the completion of the workflow.
+ *
+ * @param v in milliseconds
+ *
+ * @return
+ */
public Builder setExpiry(long v) {
_expiry = v;
return this;
}
+ /**
+ * The time periodical Helix should clean up all completed jobs. This config applies only on
+ * JobQueue.
+ *
+ * @param t in milliseconds
+ *
+ * @return
+ */
+ public Builder setJobPurgeInterval(long t) {
+ _jobPurgeInterval = t;
+ return this;
+ }
+
+ /**
+ * The max allowed numbers of failed jobs before Helix should marks the workflow failure.
+ *
+ * @param failureThreshold
+ *
+ * @return
+ */
public Builder setFailureThreshold(int failureThreshold) {
_failureThreshold = failureThreshold;
return this;
@@ -376,7 +423,7 @@ public class WorkflowConfig extends ResourceConfig {
/**
* This method only applies for JobQueue, will be ignored in generic workflows
- * @param capacity The number of capacity
+ * @param capacity The max number of jobs allowed in the queue
* @return This builder
*/
public Builder setCapacity(int capacity) {
@@ -389,7 +436,7 @@ public class WorkflowConfig extends ResourceConfig {
return this;
}
- public Builder setTerminable(boolean isTerminable) {
+ protected Builder setTerminable(boolean isTerminable) {
_isTerminable = isTerminable;
return this;
}
@@ -425,6 +472,7 @@ public class WorkflowConfig extends ResourceConfig {
builder.setConfigMap(cfg);
return builder;
}
+
// TODO: Add API to set map fields. This API only set simple fields
public Builder setConfigMap(Map<String, String> cfg) {
if (cfg.containsKey(WorkflowConfigProperty.Expiry.name())) {
@@ -459,6 +507,14 @@ public class WorkflowConfig extends ResourceConfig {
}
}
+ if (cfg.containsKey(WorkflowConfigProperty.JobPurgeInterval.name())) {
+ long jobPurgeInterval =
+ Long.valueOf(cfg.get(WorkflowConfigProperty.JobPurgeInterval.name()));
+ if (jobPurgeInterval > 0) {
+ setJobPurgeInterval(jobPurgeInterval);
+ }
+ }
+
if (cfg.containsKey(WorkflowConfigProperty.FailureThreshold.name())) {
int threshold = Integer.valueOf(cfg.get(WorkflowConfigProperty.FailureThreshold.name()));
if (threshold >= 0) {
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 563e2e8..2fc4fe1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -27,12 +27,15 @@ import java.util.TreeMap;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
/**
* Typed interface to the workflow context information stored by {@link TaskRebalancer} in the Helix
* property store
*/
public class WorkflowContext extends HelixProperty {
+ private static final Logger LOG = Logger.getLogger(WorkflowContext.class);
+
protected enum WorkflowContextProperties {
STATE,
START_TIME,
@@ -40,7 +43,8 @@ public class WorkflowContext extends HelixProperty {
JOB_STATES,
LAST_SCHEDULED_WORKFLOW,
SCHEDULED_WORKFLOWS,
- LAST_PURGE_TIME
+ LAST_PURGE_TIME,
+ StartTime
}
public static final int UNSTARTED = -1;
@@ -78,14 +82,6 @@ public class WorkflowContext extends HelixProperty {
states.put(job, s.name());
}
- protected void removeJobStates(Set<String> jobs) {
- Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
- if (states != null) {
- states.keySet().removeAll(jobs);
- _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
- }
- }
-
public TaskState getJobState(String job) {
Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (states == null) {
@@ -100,6 +96,67 @@ public class WorkflowContext extends HelixProperty {
return TaskState.valueOf(s);
}
+ protected void removeJobStates(Set<String> jobs) {
+ Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+ if (states != null) {
+ states.keySet().removeAll(jobs);
+ _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
+ }
+ }
+
+ protected void setJobStartTime(String job, long time) {
+ Map<String, String> startTimes =
+ _record.getMapField(WorkflowContextProperties.StartTime.name());
+ if (startTimes == null) {
+ startTimes = new HashMap<>();
+ _record.setMapField(WorkflowContextProperties.StartTime.name(), startTimes);
+ }
+ startTimes.put(job, String.valueOf(time));
+ }
+
+ protected void removeJobStartTime(Set<String> jobs) {
+ Map<String, String> startTimes =
+ _record.getMapField(WorkflowContextProperties.StartTime.name());
+ if (startTimes != null) {
+ startTimes.keySet().removeAll(jobs);
+ _record.setMapField(WorkflowContextProperties.StartTime.name(), startTimes);
+ }
+ }
+
+ public long getJobStartTime(String job) {
+ Map<String, String> startTimes =
+ _record.getMapField(WorkflowContextProperties.StartTime.name());
+ if (startTimes == null || !startTimes.containsKey(job)) {
+ return -1;
+ }
+
+ String t = startTimes.get(job);
+ if (t == null) {
+ return -1;
+ }
+
+ try {
+ long ret = Long.valueOf(t);
+ return ret;
+ } catch (NumberFormatException e) {
+ LOG.warn("Number error " + t + " for job start time of " + job);
+ return -1;
+ }
+ }
+
+ public Map<String, Long> getJobStartTimes() {
+ Map<String, Long> startTimes = new HashMap<>();
+ Map<String, String> startTimesMap =
+ _record.getMapField(WorkflowContextProperties.StartTime.name());
+ if (startTimesMap != null) {
+ for (Map.Entry<String, String> time : startTimesMap.entrySet()) {
+ startTimes.put(time.getKey(), Long.valueOf(time.getValue()));
+ }
+ }
+
+ return startTimes;
+ }
+
public Map<String, TaskState> getJobStates() {
Map<String, TaskState> jobStates = new HashMap<>();
Map<String, String> stateFieldMap =
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 6e6727c..ac2ac87 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -53,9 +53,9 @@ import org.apache.log4j.Logger;
public class WorkflowRebalancer extends TaskRebalancer {
private static final Logger LOG = Logger.getLogger(WorkflowRebalancer.class);
- @Override
- public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
- IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+ @Override public ResourceAssignment computeBestPossiblePartitionState(
+ ClusterDataCache clusterData, IdealState taskIs, Resource resource,
+ CurrentStateOutput currStateOutput) {
final String workflow = resource.getResourceName();
LOG.debug("Computer Best Partition for workflow: " + workflow);
@@ -93,8 +93,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
long currentTime = System.currentTimeMillis();
// Check if workflow has been finished and mark it if it is.
- if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
- && isWorkflowFinished(workflowCtx, workflowCfg)) {
+ if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx,
+ workflowCfg)) {
workflowCtx.setFinishTime(currentTime);
TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
}
@@ -123,8 +123,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
// Check for readiness, and stop processing if it's not ready
- boolean isReady =
- scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
+ boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
if (isReady) {
// Schedule jobs from this workflow.
scheduleJobs(workflow, workflowCfg, workflowCtx);
@@ -142,10 +141,11 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
/**
- * Figure out whether the jobs in the workflow should be run,
- * and if it's ready, then just schedule it
+ * Figure out whether the jobs in the workflow should be run, and if it's ready, then just
+ * schedule it
*/
- private void scheduleJobs(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx) {
+ private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
+ WorkflowContext workflowCtx) {
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
if (scheduleConfig != null && scheduleConfig.isRecurring()) {
LOG.debug("Jobs from recurring workflow are not schedule-able");
@@ -163,7 +163,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
LOG.debug(String.format("Workflow %s already have enough job in progress, "
- + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
+ + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
break;
}
@@ -174,25 +174,16 @@ public class WorkflowRebalancer extends TaskRebalancer {
// Since the start time is calculated base on the time of completion of parent jobs for this
// job, the calculated start time should only be calculate once. Persist the calculated time
// in WorkflowContext znode.
- Map<String, String> startTimeMap = workflowCtx.getRecord().getMapField(START_TIME_KEY);
- if (startTimeMap == null) {
- startTimeMap = new HashMap<String, String>();
- workflowCtx.getRecord().setMapField(START_TIME_KEY, startTimeMap);
- }
-
- long calculatedStartTime = System.currentTimeMillis();
- if (startTimeMap.containsKey(job)) {
- // Get the start time if it is already calculated
- calculatedStartTime = Long.parseLong(startTimeMap.get(job));
- } else {
+ long calculatedStartTime = workflowCtx.getJobStartTime(job);
+ if (calculatedStartTime < 0) {
+ // Calculate the start time if it is not already calculated
+ calculatedStartTime = System.currentTimeMillis();
// If the start time is not calculated before, do the math.
if (jobConfig.getExecutionDelay() >= 0) {
calculatedStartTime += jobConfig.getExecutionDelay();
}
calculatedStartTime = Math.max(calculatedStartTime, jobConfig.getExecutionStart());
- startTimeMap.put(job, String.valueOf(calculatedStartTime));
- workflowCtx.getRecord().setMapField(START_TIME_KEY, startTimeMap);
- TaskUtil.setWorkflowContext(_manager, jobConfig.getWorkflow(), workflowCtx);
+ workflowCtx.setJobStartTime(job, calculatedStartTime);
}
// Time is not ready. Set a trigger and update the start time.
@@ -285,9 +276,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
/**
* Check if a workflow is ready to schedule, and schedule a rebalance if it is not
*
- * @param workflow the Helix resource associated with the workflow
- * @param workflowCfg the workflow to check
- * @param workflowCtx the current workflow context
+ * @param workflow the Helix resource associated with the workflow
+ * @param workflowCfg the workflow to check
+ * @param workflowCtx the current workflow context
+ *
* @return true if the workflow is ready for schedule, false if not ready
*/
private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg,
@@ -330,7 +322,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
long offsetMultiplier = (-delayFromStart) / period;
long timeToSchedule = period * offsetMultiplier + startTime.getTime();
-
// Now clone the workflow if this clone has not yet been created
DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
df.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -350,7 +341,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
// Persist workflow start regardless of success to avoid retrying and failing
workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
- TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
}
// Change the time to trigger the pipeline to that of the next run
@@ -379,6 +369,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
* @param origWorkflowName the name of the existing workflow
* @param newWorkflowName the name of the new workflow
* @param newStartTime a provided start time that deviates from the desired start time
+ *
* @return the cloned workflow, or null if there was a problem cloning the existing one
*/
public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
@@ -452,9 +443,14 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
/**
+<<<<<<< HEAD
* Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
* contexts associated with this workflow, and all jobs information, including their configs,
* context, IS and EV.
+=======
+ * Cleans up workflow configs and workflow contexts associated with this workflow, including all
+ * job-level configs and context, plus workflow-level information.
+>>>>>>> Support configurable job purge interval for a queue.
*/
private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
LOG.info("Cleaning up workflow: " + workflow);
@@ -483,36 +479,54 @@ public class WorkflowRebalancer extends TaskRebalancer {
*/
// TODO: run this in a separate thread.
// Get all jobConfigs & jobContext from ClusterCache.
- protected void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+ private void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
WorkflowContext workflowContext) {
- if (workflowContext.getLastJobPurgeTime() + JOB_PURGE_INTERVAL > System.currentTimeMillis()) {
- return;
- }
+ long purgeInterval = workflowConfig.getJobPurgeInterval();
+ long currentTime = System.currentTimeMillis();
- Set<String> expiredJobs = TaskUtil
- .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
- workflowConfig, workflowContext);
- for (String job : expiredJobs) {
- _scheduledRebalancer.removeScheduledRebalance(job);
- }
- if (!TaskUtil
- .removeJobsFromWorkflow(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
- workflow, expiredJobs, true)) {
- LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+ if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
+ Set<String> expiredJobs = TaskUtil
+ .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
+ workflowConfig, workflowContext);
+
+ if (expiredJobs.isEmpty()) {
+ LOG.info("No job to purge for the queue " + workflow);
+ } else {
+ LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+ for (String job : expiredJobs) {
+ if (!TaskUtil
+ .removeJob(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), job)) {
+ LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+ }
+ _scheduledRebalancer.removeScheduledRebalance(job);
+ }
+ if (!TaskUtil
+ .removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
+ LOG.warn(
+ "Error occurred while trying to remove jobs + " + expiredJobs + " from the workflow "
+ + workflow);
+ }
+ // remove job states in workflowContext.
+ workflowContext.removeJobStates(expiredJobs);
+ workflowContext.removeJobStartTime(expiredJobs);
+ }
+ workflowContext.setLastJobPurgeTime(currentTime);
}
- long currentTime = System.currentTimeMillis();
- long nextPurgeTime = currentTime + JOB_PURGE_INTERVAL;
- workflowContext.setLastJobPurgeTime(currentTime);
+ setNextJobPurgeTime(workflow, currentTime, purgeInterval);
+ }
+
+ private void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval) {
+ long nextPurgeTime = currentTime + purgeInterval;
long currentScheduledTime = _scheduledRebalancer.getRebalanceTime(workflow);
if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
_scheduledRebalancer.scheduleRebalance(_manager, workflow, nextPurgeTime);
}
}
- @Override
- public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
- CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+ @Override public IdealState computeNewIdealState(String resourceName,
+ IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+ ClusterDataCache clusterData) {
// Nothing to do here with workflow resource.
return currentIdealState;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
index d875a60..f431285 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
@@ -177,7 +177,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
_participants.get(0).getInstanceName());
}
@@ -188,7 +190,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
}
}
@@ -222,7 +226,8 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
}
Thread.sleep(delay + 10000);
@@ -230,7 +235,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, _replica);
}
}
@@ -267,8 +274,10 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
- validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
+ validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
_participants.get(0).getInstanceName());
}
@@ -317,7 +326,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, _replica);
}
disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
@@ -404,6 +415,10 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String partition : is.getPartitionSet()) {
Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+ Assert.assertNotNull(assignmentMap,
+ is.getResourceName() + "'s best possible assignment is null for partition " + partition);
+ Assert.assertTrue(!assignmentMap.isEmpty(),
+ is.getResourceName() + "'s partition " + partition + " has no best possible map in IS.");
boolean hasTopState = false;
int activeReplica = 0;
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index a0a1617..6eecf20 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -19,11 +19,16 @@ package org.apache.helix.integration.task;
* under the License.
*/
+import java.util.HashSet;
+import java.util.Set;
import org.apache.helix.TestHelper;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -74,4 +79,48 @@ public class TestJobQueueCleanUp extends TaskTestBase {
_driver.cleanupQueue(queueName);
Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
}
+
+ @Test
+ public void testJobQueueAutoCleanUp() throws InterruptedException {
+ int capacity = 10;
+ String queueName = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
+ WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(builder.getWorkflowConfig());
+ cfgBuilder.setJobPurgeInterval(1000);
+ builder.setWorkflowConfig(cfgBuilder.build());
+
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2).setJobCommandConfigMap(
+ ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, String.valueOf(capacity / 2)))
+ .setExpiry(200L);
+ Set<String> deletedJobs = new HashSet<String>();
+ Set<String> remainJobs = new HashSet<String>();
+ for (int i = 0; i < capacity; i++) {
+ builder.enqueueJob("JOB" + i, jobBuilder);
+ if (i < capacity/2) {
+ deletedJobs.add("JOB" + i);
+ } else {
+ remainJobs.add(TaskUtil.getNamespacedJobName(queueName, "JOB" + i));
+ }
+ }
+ _driver.start(builder.build());
+ _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + (capacity - 1)), TaskState.FAILED);
+ Thread.sleep(2000);
+
+ WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+ Assert.assertEquals(config.getJobDag().getAllNodes(), remainJobs);
+
+ WorkflowContext context = _driver.getWorkflowContext(queueName);
+ Assert.assertEquals(context.getJobStates().keySet(), remainJobs);
+ Assert.assertTrue(remainJobs.containsAll(context.getJobStartTimes().keySet()));
+
+ for (String job : deletedJobs) {
+ JobConfig cfg = _driver.getJobConfig(job);
+ JobContext ctx = _driver.getJobContext(job);
+ Assert.assertNull(cfg);
+ Assert.assertNull(ctx);
+ }
+
+ }
}