You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/09 18:49:09 UTC
[1/2] git commit: [HELIX-440] One-time scheduling for task framework
Repository: helix
Updated Branches:
refs/heads/helix-provisioning 0272e3701 -> 0f79187d3
[HELIX-440] One-time scheduling for task framework
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/346d8a32
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/346d8a32
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/346d8a32
Branch: refs/heads/helix-provisioning
Commit: 346d8a32ed91db9ce182d5cea911769a23654d0b
Parents: 0272e37
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Jun 5 09:37:31 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jul 9 09:36:14 2014 -0700
----------------------------------------------------------------------
.../org/apache/helix/task/ScheduleConfig.java | 165 +++++++++++++++++++
.../org/apache/helix/task/TaskRebalancer.java | 74 +++++++++
.../java/org/apache/helix/task/TaskUtil.java | 12 ++
.../java/org/apache/helix/task/Workflow.java | 33 ++++
.../org/apache/helix/task/WorkflowConfig.java | 55 ++++++-
.../apache/helix/task/beans/ScheduleBean.java | 32 ++++
.../apache/helix/task/beans/WorkflowBean.java | 1 +
.../task/TestIndependentTaskRebalancer.java | 34 ++++
8 files changed, 404 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
new file mode 100644
index 0000000..9e3801e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
@@ -0,0 +1,165 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.task.beans.ScheduleBean;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration for scheduling both one-time and recurring workflows in Helix
+ */
+public class ScheduleConfig {
+ private static final Logger LOG = Logger.getLogger(ScheduleConfig.class);
+
+ /** Enforce that a workflow can recur at most once per minute */
+ private static final long MIN_RECURRENCE_MILLIS = 60 * 1000;
+
+ private final Date _startTime;
+ private final TimeUnit _recurUnit;
+ private final Long _recurInterval;
+
+ private ScheduleConfig(Date startTime, TimeUnit recurUnit, Long recurInterval) {
+ _startTime = startTime;
+ _recurUnit = recurUnit;
+ _recurInterval = recurInterval;
+ }
+
+ /**
+ * When the workflow should be started
+ * @return Date object representing the start time
+ */
+ public Date getStartTime() {
+ return _startTime;
+ }
+
+ /**
+ * The unit of the recurrence interval if this is a recurring workflow
+ * @return the recurrence interval unit, or null if this workflow is a one-time workflow
+ */
+ public TimeUnit getRecurrenceUnit() {
+ return _recurUnit;
+ }
+
+ /**
+ * The magnitude of the recurrence interval if this is a recurring task
+ * @return the recurrence interval magnitude, or null if this workflow is a one-time workflow
+ */
+ public Long getRecurrenceInterval() {
+ return _recurInterval;
+ }
+
+ /**
+ * Check if this workflow is recurring
+ * @return true if recurring, false if one-time
+ */
+ public boolean isRecurring() {
+ return _recurUnit != null && _recurInterval != null;
+ }
+
+ /**
+ * Check if the configured schedule is valid given these constraints:
+ * <ul>
+ * <li>All workflows must have a start time</li>
+ * <li>Recurrence unit and interval must both be present if either is present</li>
+ * <li>Recurring workflows must have a positive interval magnitude</li>
+ * <li>Intervals must be at least one minute</li>
+ * </ul>
+ * @return true if valid, false if invalid
+ */
+ public boolean isValid() {
+ // For now, disallow recurring workflows
+ if (isRecurring()) {
+ LOG.error("Recurring workflows are not currently supported.");
+ return false;
+ }
+
+ // All schedules must have a start time even if they are recurring
+ if (_startTime == null) {
+ LOG.error("All schedules must have a start time!");
+ return false;
+ }
+
+ // Recurrence properties must both either be present or absent
+ if ((_recurUnit == null && _recurInterval != null)
+ || (_recurUnit != null && _recurInterval == null)) {
+ LOG.error("Recurrence interval and unit must either both be present or both be absent");
+ return false;
+ }
+
+ // Only positive recurrence intervals are allowed if present
+ if (_recurInterval != null && _recurInterval <= 0) {
+ LOG.error("Recurrence interval must be positive");
+ return false;
+ }
+
+ // Enforce minimum interval length
+ if (_recurUnit != null) {
+ long converted = _recurUnit.toMillis(_recurInterval);
+ if (converted < MIN_RECURRENCE_MILLIS) {
+ LOG.error("Recurrence must be at least " + MIN_RECURRENCE_MILLIS + " ms");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Create this configuration from a serialized bean
+ * @param bean flat configuration of the schedule
+ * @return instantiated ScheduleConfig
+ */
+ public static ScheduleConfig from(ScheduleBean bean) {
+ return new ScheduleConfig(bean.startTime, bean.recurUnit, bean.recurInterval);
+ }
+
+ /**
+ * Create a schedule for a workflow that runs once at a specified time
+ * @param startTime the time to start the workflow
+ * @return instantiated ScheduleConfig
+ */
+ public static ScheduleConfig oneTimeDelayedStart(Date startTime) {
+ return new ScheduleConfig(startTime, null, null);
+ }
+
+ /*
+ * Create a schedule for a recurring workflow that should start immediately
+ * @param recurUnit the unit of the recurrence interval
+ * @param recurInterval the magnitude of the recurrence interval
+ * @return instantiated ScheduleConfig
+ * public static ScheduleConfig recurringFromNow(TimeUnit recurUnit, long recurInterval) {
+ * return new ScheduleConfig(new Date(), recurUnit, recurInterval);
+ * }
+ */
+
+ /*
+ * Create a schedule for a recurring workflow that should start at a specific time
+ * @param startTime the time to start the workflow the first time
+ * @param recurUnit the unit of the recurrence interval
+ * @param recurInterval the magnitude of the recurrence interval
+ * @return instantiated ScheduleConfig
+ * public static ScheduleConfig recurringFromDate(Date startTime, TimeUnit recurUnit,
+ * long recurInterval) {
+ * return new ScheduleConfig(startTime, recurUnit, recurInterval);
+ * }
+ */
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 043e7dd..37c8548 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -21,6 +21,7 @@ package org.apache.helix.task;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -29,6 +30,9 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
@@ -50,6 +54,8 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.log4j.Logger;
import com.google.common.base.Joiner;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
@@ -58,6 +64,13 @@ import com.google.common.collect.Sets;
*/
public abstract class TaskRebalancer implements HelixRebalancer {
private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+
+ /** Management of already-scheduled workflows across jobs */
+ private static final BiMap<String, Date> SCHEDULED_WORKFLOWS = HashBiMap.create();
+ private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
+ .newSingleThreadScheduledExecutor();
+
+ /** For connection management */
private HelixManager _manager;
/**
@@ -116,6 +129,12 @@ public abstract class TaskRebalancer implements HelixRebalancer {
WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
+ // Check for readiness, and stop processing if it's not ready
+ boolean isReady = scheduleIfNotReady(workflowCfg, workflowResource, resourceName);
+ if (!isReady) {
+ return emptyAssignment(resourceName);
+ }
+
// Initialize workflow context if needed
if (workflowCtx == null) {
workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
@@ -422,6 +441,43 @@ public abstract class TaskRebalancer implements HelixRebalancer {
}
/**
+ * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
+ * @param workflowCfg the workflow to check
+ * @param workflowResource the Helix resource associated with the workflow
+ * @param jobResource a job from the workflow
+ * @return true if ready, false if not ready
+ */
+ private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, String workflowResource,
+ String jobResource) {
+ // Ignore non-scheduled workflows
+ if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
+ return true;
+ }
+
+ // Figure out when this should be run, and if it's ready, then just run it
+ ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+ Date startTime = scheduleConfig.getStartTime();
+ long delay = startTime.getTime() - new Date().getTime();
+ if (delay <= 0) {
+ SCHEDULED_WORKFLOWS.remove(workflowResource);
+ SCHEDULED_WORKFLOWS.inverse().remove(startTime);
+ return true;
+ }
+
+ // No need to schedule the same runnable at the same time
+ if (SCHEDULED_WORKFLOWS.containsKey(workflowResource)
+ || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) {
+ return false;
+ }
+
+ // For workflows not yet scheduled, schedule them and record it
+ RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
+ SCHEDULED_WORKFLOWS.put(workflowResource, startTime);
+ SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS);
+ return false;
+ }
+
+ /**
* Checks if the job has completed.
* @param ctx The rebalancer context.
* @param allPartitions The set of partitions to check.
@@ -660,4 +716,22 @@ public abstract class TaskRebalancer implements HelixRebalancer {
_state = state;
}
}
+
+ /**
+ * The simplest possible runnable that will trigger a run of the controller pipeline
+ */
+ private static class RebalanceInvoker implements Runnable {
+ private final HelixManager _manager;
+ private final String _resource;
+
+ public RebalanceInvoker(HelixManager manager, String resource) {
+ _manager = manager;
+ _resource = resource;
+ }
+
+ @Override
+ public void run() {
+ TaskUtil.invokeRebalance(_manager, _resource);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 96b7e55..43a1741 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -36,6 +36,7 @@ import org.apache.helix.api.State;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.log4j.Logger;
@@ -182,6 +183,17 @@ public class TaskUtil {
return Collections.emptyMap();
}
+ /**
+ * Trigger a controller pipeline execution for a given resource.
+ * @param manager Helix connection
+ * @param resource the name of the resource changed to triggering the execution
+ */
+ public static void invokeRebalance(HelixManager manager, String resource) {
+ // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ accessor.updateProperty(accessor.keyBuilder().idealStates(resource), new IdealState(resource));
+ }
+
private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
ConfigAccessor configAccessor = manager.getConfigAccessor();
http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 70fb82c..fef0274 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -27,6 +27,7 @@ import java.io.Reader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -82,12 +83,31 @@ public class Workflow {
return _taskConfigs;
}
+ public WorkflowConfig getWorkflowConfig() {
+ return _workflowConfig;
+ }
+
public Map<String, String> getResourceConfigMap() throws Exception {
Map<String, String> cfgMap = new HashMap<String, String>();
cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getJobDag().toJson());
cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(_workflowConfig.getExpiry()));
cfgMap.put(WorkflowConfig.TARGET_STATE, _workflowConfig.getTargetState().name());
+ // Populate schedule if present
+ ScheduleConfig scheduleConfig = _workflowConfig.getScheduleConfig();
+ if (scheduleConfig != null) {
+ Date startTime = scheduleConfig.getStartTime();
+ if (startTime != null) {
+ String formattedTime = WorkflowConfig.DEFAULT_DATE_FORMAT.format(startTime);
+ cfgMap.put(WorkflowConfig.START_TIME, formattedTime);
+ }
+ if (scheduleConfig.isRecurring()) {
+ cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, scheduleConfig.getRecurrenceUnit().toString());
+ cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, scheduleConfig.getRecurrenceInterval()
+ .toString());
+ }
+ }
+
return cfgMap;
}
@@ -198,6 +218,10 @@ public class Workflow {
}
}
+ if (wf.schedule != null) {
+ builder.setScheduleConfig(ScheduleConfig.from(wf.schedule));
+ }
+
return builder.build();
}
@@ -235,6 +259,7 @@ public class Workflow {
private JobDag _dag;
private Map<String, Map<String, String>> _jobConfigs;
private Map<String, List<TaskConfig>> _taskConfigs;
+ private ScheduleConfig _scheduleConfig;
private long _expiry;
public Builder(String name) {
@@ -291,6 +316,11 @@ public class Workflow {
return this;
}
+ public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
+ _scheduleConfig = scheduleConfig;
+ return this;
+ }
+
public Builder setExpiry(long expiry) {
_expiry = expiry;
return this;
@@ -309,6 +339,9 @@ public class Workflow {
WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
builder.setTaskDag(_dag);
builder.setTargetState(TargetState.START);
+ if (_scheduleConfig != null) {
+ builder.setScheduleConfig(_scheduleConfig);
+ }
if (_expiry > 0) {
builder.setExpiry(_expiry);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index ff4a2a9..a8aff1f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -19,29 +19,48 @@ package org.apache.helix.task;
* under the License.
*/
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.log4j.Logger;
/**
* Provides a typed interface to workflow level configurations. Validates the configurations.
*/
public class WorkflowConfig {
+ private static final Logger LOG = Logger.getLogger(WorkflowConfig.class);
+
/* Config fields */
public static final String DAG = "Dag";
public static final String TARGET_STATE = "TargetState";
public static final String EXPIRY = "Expiry";
+ public static final String START_TIME = "StartTime";
+ public static final String RECURRENCE_UNIT = "RecurrenceUnit";
+ public static final String RECURRENCE_INTERVAL = "RecurrenceInterval";
/* Default values */
public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
+ public static final SimpleDateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat(
+ "MM-dd-yyyy HH:mm:ss");
+ static {
+ DEFAULT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
/* Member variables */
private JobDag _jobDag;
private TargetState _targetState;
private long _expiry;
+ private ScheduleConfig _scheduleConfig;
- private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry) {
+ private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry,
+ ScheduleConfig scheduleConfig) {
_jobDag = jobDag;
_targetState = targetState;
_expiry = expiry;
+ _scheduleConfig = scheduleConfig;
}
public JobDag getJobDag() {
@@ -56,10 +75,15 @@ public class WorkflowConfig {
return _expiry;
}
+ public ScheduleConfig getScheduleConfig() {
+ return _scheduleConfig;
+ }
+
public static class Builder {
private JobDag _taskDag = JobDag.EMPTY_DAG;
private TargetState _targetState = TargetState.START;
private long _expiry = DEFAULT_EXPIRY;
+ private ScheduleConfig _scheduleConfig;
public Builder() {
// Nothing to do
@@ -68,7 +92,7 @@ public class WorkflowConfig {
public WorkflowConfig build() {
validate();
- return new WorkflowConfig(_taskDag, _targetState, _expiry);
+ return new WorkflowConfig(_taskDag, _targetState, _expiry, _scheduleConfig);
}
public Builder setTaskDag(JobDag v) {
@@ -86,6 +110,11 @@ public class WorkflowConfig {
return this;
}
+ public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
+ _scheduleConfig = scheduleConfig;
+ return this;
+ }
+
public static Builder fromMap(Map<String, String> cfg) {
Builder b = new Builder();
@@ -103,6 +132,24 @@ public class WorkflowConfig {
b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));
}
+ // Parse schedule-specific configs, if they exist
+ Date startTime = null;
+ if (cfg.containsKey(START_TIME)) {
+ try {
+ startTime = DEFAULT_DATE_FORMAT.parse(cfg.get(START_TIME));
+ } catch (ParseException e) {
+ LOG.error("Unparseable date " + cfg.get(START_TIME), e);
+ }
+ }
+ if (cfg.containsKey(RECURRENCE_UNIT) && cfg.containsKey(RECURRENCE_INTERVAL)) {
+ /*
+ * b.setScheduleConfig(ScheduleConfig.recurringFromDate(startTime,
+ * TimeUnit.valueOf(cfg.get(RECURRENCE_UNIT)),
+ * Long.parseLong(cfg.get(RECURRENCE_INTERVAL))));
+ */
+ } else if (startTime != null) {
+ b.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(startTime));
+ }
return b;
}
@@ -110,6 +157,10 @@ public class WorkflowConfig {
if (_expiry < 0) {
throw new IllegalArgumentException(
String.format("%s has invalid value %s", EXPIRY, _expiry));
+ } else if (_scheduleConfig != null && !_scheduleConfig.isValid()) {
+ throw new IllegalArgumentException(
+ "Scheduler configuration is invalid. The configuration must have a start time if it is "
+ + "one-time, and it must have a positive interval magnitude if it is recurring");
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java
new file mode 100644
index 0000000..9e843f5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java
@@ -0,0 +1,32 @@
+package org.apache.helix.task.beans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A bean representing how a workflow can be scheduled in Helix
+ */
+public class ScheduleBean {
+ public Date startTime;
+ public Long recurInterval;
+ public TimeUnit recurUnit;
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index 76da4c8..2ea23c7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -28,4 +28,5 @@ public class WorkflowBean {
public String name;
public String expiry;
public List<JobBean> jobs;
+ public ScheduleBean schedule;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 006c3fe..1196f41 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.task;
* under the License.
*/
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.ScheduleConfig;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
@@ -44,7 +46,9 @@ import org.apache.helix.task.TaskResult;
import org.apache.helix.task.TaskResult.Status;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -246,6 +250,36 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertTrue(_runCounts.values().contains(1));
}
+ @Test
+ public void testOneTimeScheduled() throws Exception {
+ String jobName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+ List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+ Map<String, String> taskConfigMap = Maps.newHashMap();
+ TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
+ taskConfigs.add(taskConfig1);
+ workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+ workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+ Map<String, String> jobConfigMap = Maps.newHashMap();
+ jobConfigMap.put("Timeout", "1000");
+ workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+ long inFiveSeconds = System.currentTimeMillis() + (5 * 1000);
+ workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds)));
+ _driver.start(workflowBuilder.build());
+
+ // Ensure the job completes
+ TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+ TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+ // Ensure that the class was invoked
+ Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+
+ // Check that the workflow only started after the start time (with a 1 second buffer)
+ WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, jobName);
+ long startTime = workflowCtx.getStartTime();
+ Assert.assertTrue((startTime + 1000) >= inFiveSeconds);
+ }
+
private class TaskOne extends ReindexTask {
private final boolean _shouldFail;
private final String _instanceName;
[2/2] git commit: [HELIX-416] Support recurring scheduled tasks
Posted by ka...@apache.org.
[HELIX-416] Support recurring scheduled tasks
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0f79187d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0f79187d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0f79187d
Branch: refs/heads/helix-provisioning
Commit: 0f79187d31c8769a668ba81f8bcc5e5831c659da
Parents: 346d8a3
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Jun 23 13:59:59 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jul 9 09:48:57 2014 -0700
----------------------------------------------------------------------
.../org/apache/helix/task/ScheduleConfig.java | 29 ++-
.../org/apache/helix/task/TaskConstants.java | 4 +
.../java/org/apache/helix/task/TaskDriver.java | 36 +++-
.../org/apache/helix/task/TaskRebalancer.java | 134 +++++++++---
.../java/org/apache/helix/task/TaskUtil.java | 211 +++++++++++++++++++
.../java/org/apache/helix/task/Workflow.java | 4 +-
.../org/apache/helix/task/WorkflowConfig.java | 39 +---
.../org/apache/helix/task/WorkflowContext.java | 9 +
.../apache/helix/task/beans/WorkflowBean.java | 4 +-
9 files changed, 392 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
index 9e3801e..b123793 100644
--- a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
@@ -87,12 +87,6 @@ public class ScheduleConfig {
* @return true if valid, false if invalid
*/
public boolean isValid() {
- // For now, disallow recurring workflows
- if (isRecurring()) {
- LOG.error("Recurring workflows are not currently supported.");
- return false;
- }
-
// All schedules must have a start time even if they are recurring
if (_startTime == null) {
LOG.error("All schedules must have a start time!");
@@ -141,25 +135,28 @@ public class ScheduleConfig {
return new ScheduleConfig(startTime, null, null);
}
- /*
+ /**
* Create a schedule for a recurring workflow that should start immediately
* @param recurUnit the unit of the recurrence interval
* @param recurInterval the magnitude of the recurrence interval
* @return instantiated ScheduleConfig
- * public static ScheduleConfig recurringFromNow(TimeUnit recurUnit, long recurInterval) {
- * return new ScheduleConfig(new Date(), recurUnit, recurInterval);
- * }
*/
+ public static ScheduleConfig recurringFromNow(TimeUnit recurUnit, long recurInterval) {
+ return new ScheduleConfig(new Date(), recurUnit, recurInterval);
+ }
- /*
+ /**
* Create a schedule for a recurring workflow that should start at a specific time
- * @param startTime the time to start the workflow the first time
+ * @param startTime the time to start the workflow the first time, or null if now
* @param recurUnit the unit of the recurrence interval
* @param recurInterval the magnitude of the recurrence interval
* @return instantiated ScheduleConfig
- * public static ScheduleConfig recurringFromDate(Date startTime, TimeUnit recurUnit,
- * long recurInterval) {
- * return new ScheduleConfig(startTime, recurUnit, recurInterval);
- * }
*/
+ public static ScheduleConfig recurringFromDate(Date startTime, TimeUnit recurUnit,
+ long recurInterval) {
+ if (startTime == null) {
+ startTime = new Date();
+ }
+ return new ScheduleConfig(startTime, recurUnit, recurInterval);
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index 305323d..34008d6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -39,4 +39,8 @@ public class TaskConstants {
* The root property store path at which the {@link TaskRebalancer} stores context information.
*/
public static final String REBALANCER_CONTEXT_ROOT = "/TaskRebalancer";
+ /**
+ * Resource prefix for scheduled workflows
+ */
+ public static final String SCHEDULED = "SCHEDULED";
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index d5e9101..0610c01 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.I0Itec.zkclient.DataUpdater;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@@ -34,6 +35,7 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -41,6 +43,7 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.log4j.Logger;
@@ -221,11 +224,38 @@ public class TaskDriver {
/** Helper function to change target state for a given task */
private void setTaskTargetState(String jobResource, TargetState state) {
+ setSingleTaskTargetState(jobResource, state);
+
+ // For recurring schedules, child workflows must also be handled
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- HelixProperty p = new HelixProperty(jobResource);
- p.getRecord().setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
- accessor.updateProperty(accessor.keyBuilder().resourceConfig(jobResource), p);
+ List<String> resources = accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+ for (String resource : resources) {
+ String prefix = resource + "_" + TaskConstants.SCHEDULED;
+ if (resource.startsWith(prefix)) {
+ setSingleTaskTargetState(resource, state);
+ }
+ }
+ }
+ /** Helper function to change target state for a given task */
+ private void setSingleTaskTargetState(String jobResource, final TargetState state) {
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ // Only update target state for non-completed workflows
+ String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME);
+ if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
+ currentData.setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
+ }
+ return currentData;
+ }
+ };
+ List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
+ updaters.add(updater);
+ List<String> paths = Lists.newArrayList();
+ paths.add(accessor.keyBuilder().resourceConfig(jobResource).getPath());
+ accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
invokeRebalance();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 37c8548..0e11d21 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
@@ -57,6 +58,7 @@ import com.google.common.base.Joiner;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@@ -65,12 +67,12 @@ import com.google.common.collect.Sets;
public abstract class TaskRebalancer implements HelixRebalancer {
private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
- /** Management of already-scheduled workflows across jobs */
+ // Management of already-scheduled workflows across jobs
private static final BiMap<String, Date> SCHEDULED_WORKFLOWS = HashBiMap.create();
private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
.newSingleThreadScheduledExecutor();
- /** For connection management */
+ // For connection management
private HelixManager _manager;
/**
@@ -129,12 +131,6 @@ public abstract class TaskRebalancer implements HelixRebalancer {
WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
- // Check for readiness, and stop processing if it's not ready
- boolean isReady = scheduleIfNotReady(workflowCfg, workflowResource, resourceName);
- if (!isReady) {
- return emptyAssignment(resourceName);
- }
-
// Initialize workflow context if needed
if (workflowCtx == null) {
workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
@@ -145,7 +141,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
for (String parent : workflowCfg.getJobDag().getDirectParents(resourceName)) {
if (workflowCtx.getJobState(parent) == null
|| !workflowCtx.getJobState(parent).equals(TaskState.COMPLETED)) {
- return emptyAssignment(resourceName);
+ return emptyAssignment(resourceName, currStateOutput);
}
}
@@ -153,7 +149,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
cleanup(_manager, resourceName, workflowCfg, workflowResource);
- return emptyAssignment(resourceName);
+ return emptyAssignment(resourceName, currStateOutput);
}
// Check if this workflow has been finished past its expiry.
@@ -161,7 +157,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
&& workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
markForDeletion(_manager, workflowResource);
cleanup(_manager, resourceName, workflowCfg, workflowResource);
- return emptyAssignment(resourceName);
+ return emptyAssignment(resourceName, currStateOutput);
}
// Fetch any existing context information from the property store.
@@ -174,9 +170,17 @@ public abstract class TaskRebalancer implements HelixRebalancer {
// The job is already in a final state (completed/failed).
if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
|| workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
- return emptyAssignment(resourceName);
+ return emptyAssignment(resourceName, currStateOutput);
+ }
+
+ // Check for readiness, and stop processing if it's not ready
+ boolean isReady =
+ scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
+ if (!isReady) {
+ return emptyAssignment(resourceName, currStateOutput);
}
+ // Grab the old assignment, or an empty one if it doesn't exist
ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
if (prevAssignment == null) {
prevAssignment = new ResourceAssignment(ResourceId.from(resourceName));
@@ -359,8 +363,9 @@ public abstract class TaskRebalancer implements HelixRebalancer {
if (!successOptional) {
workflowCtx.setJobState(jobResource, TaskState.FAILED);
workflowCtx.setWorkflowState(TaskState.FAILED);
+ workflowCtx.setFinishTime(System.currentTimeMillis());
addAllPartitions(allPartitions, partitionsToDropFromIs);
- return emptyAssignment(jobResource);
+ return emptyAssignment(jobResource, currStateOutput);
} else {
skippedPartitions.add(pId);
partitionsToDropFromIs.add(pId);
@@ -443,12 +448,14 @@ public abstract class TaskRebalancer implements HelixRebalancer {
/**
* Check if a workflow is ready to schedule, and schedule a rebalance if it is not
* @param workflowCfg the workflow to check
+ * @param workflowCtx the current workflow context
* @param workflowResource the Helix resource associated with the workflow
* @param jobResource a job from the workflow
+ * @param cache the current snapshot of the cluster
* @return true if ready, false if not ready
*/
- private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, String workflowResource,
- String jobResource) {
+ private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ String workflowResource, String jobResource, Cluster cache) {
// Ignore non-scheduled workflows
if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
return true;
@@ -457,11 +464,66 @@ public abstract class TaskRebalancer implements HelixRebalancer {
// Figure out when this should be run, and if it's ready, then just run it
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
Date startTime = scheduleConfig.getStartTime();
- long delay = startTime.getTime() - new Date().getTime();
- if (delay <= 0) {
- SCHEDULED_WORKFLOWS.remove(workflowResource);
- SCHEDULED_WORKFLOWS.inverse().remove(startTime);
- return true;
+ long currentTime = new Date().getTime();
+ long delayFromStart = startTime.getTime() - currentTime;
+
+ if (delayFromStart <= 0) {
+ // Remove any timers that are past-time for this workflow
+ Date scheduledTime = SCHEDULED_WORKFLOWS.get(workflowResource);
+ if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+ SCHEDULED_WORKFLOWS.remove(workflowResource);
+ }
+
+ // Recurring workflows are just templates that spawn new workflows
+ if (scheduleConfig.isRecurring()) {
+ // Skip scheduling this workflow if it's not in a start state
+ if (!workflowCfg.getTargetState().equals(TargetState.START)) {
+ return false;
+ }
+
+ // Skip scheduling this workflow again if the previous run (if any) is still active
+ String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
+ if (lastScheduled != null) {
+ WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled);
+ if (lastWorkflowCtx == null
+ || lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+ return false;
+ }
+ }
+
+ // Figure out how many jumps are needed, thus the time to schedule the next workflow
+ // The negative of the delay is the amount of time past the start time
+ long period =
+ scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
+ long offsetMultiplier = (-delayFromStart) / period;
+ long timeToSchedule = period * offsetMultiplier + startTime.getTime();
+
+ // Now clone the workflow if this clone has not yet been created
+ String newWorkflowName =
+ workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier;
+ if (lastScheduled == null || !lastScheduled.equals(newWorkflowName)) {
+ Workflow clonedWf =
+ TaskUtil.cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(
+ timeToSchedule));
+ TaskDriver driver = new TaskDriver(_manager);
+ try {
+ // Start the cloned workflow
+ driver.start(clonedWf);
+ } catch (Exception e) {
+ LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
+ }
+ // Persist workflow start regardless of success to avoid retrying and failing
+ workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
+ TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+ }
+
+ // Change the time to trigger the pipeline to that of the next run
+ startTime = new Date(timeToSchedule + period);
+ delayFromStart = startTime.getTime() - System.currentTimeMillis();
+ } else {
+ // This is a one-time workflow and is ready
+ return true;
+ }
}
// No need to schedule the same runnable at the same time
@@ -470,11 +532,22 @@ public abstract class TaskRebalancer implements HelixRebalancer {
return false;
}
+ scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart);
+ return false;
+ }
+
+ private void scheduleRebalance(String workflowResource, String jobResource, Date startTime,
+ long delayFromStart) {
+ // No need to schedule the same runnable at the same time
+ if (SCHEDULED_WORKFLOWS.containsKey(workflowResource)
+ || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) {
+ return;
+ }
+
// For workflows not yet scheduled, schedule them and record it
RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
SCHEDULED_WORKFLOWS.put(workflowResource, startTime);
- SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS);
- return false;
+ SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS);
}
/**
@@ -620,8 +693,21 @@ public abstract class TaskRebalancer implements HelixRebalancer {
}
}
- private static ResourceAssignment emptyAssignment(String name) {
- return new ResourceAssignment(ResourceId.from(name));
+ private static ResourceAssignment emptyAssignment(String name,
+ ResourceCurrentState currStateOutput) {
+ ResourceId resourceId = ResourceId.from(name);
+ ResourceAssignment assignment = new ResourceAssignment(resourceId);
+ Set<PartitionId> partitions = currStateOutput.getCurrentStateMappedPartitions(resourceId);
+ for (PartitionId partition : partitions) {
+ Map<ParticipantId, State> currentStateMap =
+ currStateOutput.getCurrentStateMap(resourceId, partition);
+ Map<ParticipantId, State> replicaMap = Maps.newHashMap();
+ for (ParticipantId participantId : currentStateMap.keySet()) {
+ replicaMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+ }
+ assignment.addReplicaMap(partition, replicaMap);
+ }
+ return assignment;
}
private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 43a1741..b8582b1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -20,10 +20,14 @@ package org.apache.helix.task;
*/
import java.io.IOException;
+import java.text.ParseException;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
@@ -44,6 +48,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -63,6 +68,9 @@ public class TaskUtil {
*/
public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
HelixProperty jobResourceConfig = getResourceConfig(manager, jobResource);
+ if (jobResourceConfig == null) {
+ return null;
+ }
JobConfig.Builder b =
JobConfig.Builder.fromMap(jobResourceConfig.getRecord().getSimpleFields());
Map<String, Map<String, String>> rawTaskConfigMap =
@@ -76,13 +84,33 @@ public class TaskUtil {
return b.build();
}
+ /**
+ * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
+ * @param manager Helix manager object used to connect to Helix.
+ * @param workflowResource The name of the workflow resource.
+ * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
+ * workflow, null otherwise.
+ */
public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
+ if (workflowCfg == null) {
+ return null;
+ }
WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
return b.build();
}
+ /**
+ * Request a state change for a specific task.
+ * @param accessor connected Helix data accessor
+ * @param instance the instance serving the task
+ * @param sessionId the current session of the instance
+ * @param resource the job name
+ * @param partition the task partition name
+ * @param state the requested state
+ * @return true if the request was persisted, false otherwise
+ */
public static boolean setRequestedState(HelixDataAccessor accessor, String instance,
String sessionId, String resource, String partition, TaskPartitionState state) {
LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state,
@@ -101,11 +129,23 @@ public class TaskUtil {
}
}
+ /**
+ * Get a Helix configuration scope at a resource (i.e. job and workflow) level
+ * @param clusterName the cluster containing the resource
+ * @param resource the resource name
+ * @return instantiated {@link HelixConfigScope}
+ */
public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
.forCluster(clusterName).forResource(resource).build();
}
+ /**
+ * Get the last task assignment for a given job
+ * @param manager a connection to Helix
+ * @param resourceName the name of the job
+ * @return {@link ResourceAssignment} instance, or null if no assignment is available
+ */
public static ResourceAssignment getPrevResourceAssignment(HelixManager manager,
String resourceName) {
ZNRecord r =
@@ -115,6 +155,12 @@ public class TaskUtil {
return r != null ? new ResourceAssignment(r) : null;
}
+ /**
+ * Set the last task assignment for a given job
+ * @param manager a connection to Helix
+ * @param resourceName the name of the job
+ * @param ra {@link ResourceAssignment} containing the task assignment
+ */
public static void setPrevResourceAssignment(HelixManager manager, String resourceName,
ResourceAssignment ra) {
manager.getHelixPropertyStore().set(
@@ -122,6 +168,12 @@ public class TaskUtil {
ra.getRecord(), AccessOption.PERSISTENT);
}
+ /**
+ * Get the runtime context of a single job
+ * @param manager a connection to Helix
+ * @param jobResource the name of the job
+ * @return the {@link JobContext}, or null if none is available
+ */
public static JobContext getJobContext(HelixManager manager, String jobResource) {
ZNRecord r =
manager.getHelixPropertyStore().get(
@@ -130,12 +182,24 @@ public class TaskUtil {
return r != null ? new JobContext(r) : null;
}
+ /**
+ * Set the runtime context of a single job
+ * @param manager a connection to Helix
+ * @param jobResource the name of the job
+ * @param ctx the up-to-date {@link JobContext} for the job
+ */
public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) {
manager.getHelixPropertyStore().set(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
ctx.getRecord(), AccessOption.PERSISTENT);
}
+ /**
+ * Get the rumtime context of a single workflow
+ * @param manager a connection to Helix
+ * @param workflowResource the name of the workflow
+ * @return the {@link WorkflowContext}, or null if none is available
+ */
public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
ZNRecord r =
manager.getHelixPropertyStore().get(
@@ -144,6 +208,12 @@ public class TaskUtil {
return r != null ? new WorkflowContext(r) : null;
}
+ /**
+ * Set the rumtime context of a single workflow
+ * @param manager a connection to Helix
+ * @param workflowResource the name of the workflow
+ * @param ctx the up-to-date {@link WorkflowContext} for the workflow
+ */
public static void setWorkflowContext(HelixManager manager, String workflowResource,
WorkflowContext ctx) {
manager.getHelixPropertyStore().set(
@@ -151,14 +221,45 @@ public class TaskUtil {
ctx.getRecord(), AccessOption.PERSISTENT);
}
+ /**
+ * Get a workflow-qualified job name for a single-job workflow
+ * @param singleJobWorkflow the name of the single-job workflow
+ * @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow
+ */
public static String getNamespacedJobName(String singleJobWorkflow) {
return getNamespacedJobName(singleJobWorkflow, singleJobWorkflow);
}
+ /**
+ * Get a workflow-qualified job name for a job in that workflow
+ * @param workflowResource the name of the workflow
+ * @param jobName the un-namespaced name of the job
+ * @return The namespaced job name, which is just workflowResource_jobName
+ */
public static String getNamespacedJobName(String workflowResource, String jobName) {
return workflowResource + "_" + jobName;
}
+ /**
+ * Remove the workflow namespace from the job name
+ * @param workflowResource the name of the workflow that owns the job
+ * @param jobName the namespaced job name
+ * @return the denamespaced job name, or the same job name if it is already denamespaced
+ */
+ public static String getDenamespacedJobName(String workflowResource, String jobName) {
+ if (jobName.contains(workflowResource)) {
+ // skip the entire length of the work plus the underscore
+ return jobName.substring(jobName.indexOf(workflowResource) + workflowResource.length() + 1);
+ } else {
+ return jobName;
+ }
+ }
+
+ /**
+ * Serialize a map of job-level configurations as a single string
+ * @param commandConfig map of job config key to config value
+ * @return serialized string
+ */
public static String serializeJobConfigMap(Map<String, String> commandConfig) {
ObjectMapper mapper = new ObjectMapper();
try {
@@ -170,6 +271,11 @@ public class TaskUtil {
return null;
}
+ /**
+ * Deserialize a single string into a map of job-level configurations
+ * @param commandConfig the serialized job config map
+ * @return a map of job config key to config value
+ */
public static Map<String, String> deserializeJobConfigMap(String commandConfig) {
ObjectMapper mapper = new ObjectMapper();
try {
@@ -194,6 +300,111 @@ public class TaskUtil {
accessor.updateProperty(accessor.keyBuilder().idealStates(resource), new IdealState(resource));
}
+ /**
+ * Get a ScheduleConfig from a workflow config string map
+ * @param cfg the string map
+ * @return a ScheduleConfig if one exists, otherwise null
+ */
+ public static ScheduleConfig parseScheduleFromConfigMap(Map<String, String> cfg) {
+ // Parse schedule-specific configs, if they exist
+ Date startTime = null;
+ if (cfg.containsKey(WorkflowConfig.START_TIME)) {
+ try {
+ startTime = WorkflowConfig.DEFAULT_DATE_FORMAT.parse(cfg.get(WorkflowConfig.START_TIME));
+ } catch (ParseException e) {
+ LOG.error("Unparseable date " + cfg.get(WorkflowConfig.START_TIME), e);
+ return null;
+ }
+ }
+ if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT)
+ && cfg.containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) {
+ return ScheduleConfig.recurringFromDate(startTime,
+ TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)),
+ Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL)));
+ } else if (startTime != null) {
+ return ScheduleConfig.oneTimeDelayedStart(startTime);
+ }
+ return null;
+ }
+
+ /**
+ * Create a new workflow based on an existing one
+ * @param manager connection to Helix
+ * @param origWorkflowName the name of the existing workflow
+ * @param newWorkflowName the name of the new workflow
+ * @param newStartTime a provided start time that deviates from the desired start time
+ * @return the cloned workflow, or null if there was a problem cloning the existing one
+ */
+ public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
+ String newWorkflowName, Date newStartTime) {
+ // Read all resources, including the workflow and jobs of interest
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Map<String, HelixProperty> resourceConfigMap =
+ accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+ if (!resourceConfigMap.containsKey(origWorkflowName)) {
+ LOG.error("No such workflow named " + origWorkflowName);
+ return null;
+ }
+ if (resourceConfigMap.containsKey(newWorkflowName)) {
+ LOG.error("Workflow with name " + newWorkflowName + " already exists!");
+ return null;
+ }
+
+ // Create a new workflow with a new name
+ HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
+ Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
+ JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
+ Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
+ Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
+
+ // Set the workflow expiry
+ builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+
+ // Set the schedule, if applicable
+ ScheduleConfig scheduleConfig;
+ if (newStartTime != null) {
+ scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
+ } else {
+ scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields);
+ }
+ if (scheduleConfig != null) {
+ builder.setScheduleConfig(scheduleConfig);
+ }
+
+ // Add each job back as long as the original exists
+ Set<String> namespacedJobs = jobDag.getAllNodes();
+ for (String namespacedJob : namespacedJobs) {
+ if (resourceConfigMap.containsKey(namespacedJob)) {
+ // Copy over job-level and task-level configs
+ String job = getDenamespacedJobName(origWorkflowName, namespacedJob);
+ HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
+ Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
+ jobSimpleFields.put(JobConfig.WORKFLOW_ID, newWorkflowName); // overwrite workflow name
+ for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
+ builder.addConfig(job, e.getKey(), e.getValue());
+ }
+ Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
+ List<TaskConfig> taskConfigs = Lists.newLinkedList();
+ for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
+ TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+ taskConfigs.add(taskConfig);
+ }
+ builder.addTaskConfigs(job, taskConfigs);
+
+ // Add dag dependencies
+ Set<String> children = parentsToChildren.get(namespacedJob);
+ if (children != null) {
+ for (String namespacedChild : children) {
+ String child = getDenamespacedJobName(origWorkflowName, namespacedChild);
+ builder.addParentChildDependency(job, child);
+ }
+ }
+ }
+ }
+ return builder.build();
+ }
+
private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
ConfigAccessor configAccessor = manager.getConfigAccessor();
http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index fef0274..320c020 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -221,6 +221,7 @@ public class Workflow {
if (wf.schedule != null) {
builder.setScheduleConfig(ScheduleConfig.from(wf.schedule));
}
+ builder.setExpiry(wf.expiry);
return builder.build();
}
@@ -267,7 +268,7 @@ public class Workflow {
_dag = new JobDag();
_jobConfigs = new TreeMap<String, Map<String, String>>();
_taskConfigs = new TreeMap<String, List<TaskConfig>>();
- _expiry = -1;
+ _expiry = WorkflowConfig.DEFAULT_EXPIRY;
}
public Builder addConfig(String job, String key, String val) {
@@ -345,7 +346,6 @@ public class Workflow {
if (_expiry > 0) {
builder.setExpiry(_expiry);
}
-
return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate
// internally
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index a8aff1f..782c375 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -19,20 +19,14 @@ package org.apache.helix.task;
* under the License.
*/
-import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.Date;
import java.util.Map;
import java.util.TimeZone;
-import org.apache.log4j.Logger;
-
/**
* Provides a typed interface to workflow level configurations. Validates the configurations.
*/
public class WorkflowConfig {
- private static final Logger LOG = Logger.getLogger(WorkflowConfig.class);
-
/* Config fields */
public static final String DAG = "Dag";
public static final String TARGET_STATE = "TargetState";
@@ -50,10 +44,10 @@ public class WorkflowConfig {
}
/* Member variables */
- private JobDag _jobDag;
- private TargetState _targetState;
- private long _expiry;
- private ScheduleConfig _scheduleConfig;
+ private final JobDag _jobDag;
+ private final TargetState _targetState;
+ private final long _expiry;
+ private final ScheduleConfig _scheduleConfig;
private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry,
ScheduleConfig scheduleConfig) {
@@ -85,10 +79,6 @@ public class WorkflowConfig {
private long _expiry = DEFAULT_EXPIRY;
private ScheduleConfig _scheduleConfig;
- public Builder() {
- // Nothing to do
- }
-
public WorkflowConfig build() {
validate();
@@ -117,11 +107,9 @@ public class WorkflowConfig {
public static Builder fromMap(Map<String, String> cfg) {
Builder b = new Builder();
-
if (cfg == null) {
return b;
}
-
if (cfg.containsKey(EXPIRY)) {
b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
}
@@ -133,22 +121,9 @@ public class WorkflowConfig {
}
// Parse schedule-specific configs, if they exist
- Date startTime = null;
- if (cfg.containsKey(START_TIME)) {
- try {
- startTime = DEFAULT_DATE_FORMAT.parse(cfg.get(START_TIME));
- } catch (ParseException e) {
- LOG.error("Unparseable date " + cfg.get(START_TIME), e);
- }
- }
- if (cfg.containsKey(RECURRENCE_UNIT) && cfg.containsKey(RECURRENCE_INTERVAL)) {
- /*
- * b.setScheduleConfig(ScheduleConfig.recurringFromDate(startTime,
- * TimeUnit.valueOf(cfg.get(RECURRENCE_UNIT)),
- * Long.parseLong(cfg.get(RECURRENCE_INTERVAL))));
- */
- } else if (startTime != null) {
- b.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(startTime));
+ ScheduleConfig scheduleConfig = TaskUtil.parseScheduleFromConfigMap(cfg);
+ if (scheduleConfig != null) {
+ b.setScheduleConfig(scheduleConfig);
}
return b;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 4feda1b..6ad71a1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -34,6 +34,7 @@ public class WorkflowContext extends HelixProperty {
public static final String START_TIME = "START_TIME";
public static final String FINISH_TIME = "FINISH_TIME";
public static final String TASK_STATES = "TASK_STATES";
+ public static final String LAST_SCHEDULED_WORKFLOW = "LAST_SCHEDULED_WORKFLOW";
public static final int UNFINISHED = -1;
public WorkflowContext(ZNRecord record) {
@@ -106,4 +107,12 @@ public class WorkflowContext extends HelixProperty {
return Long.parseLong(tStr);
}
+
+ public void setLastScheduledSingleWorkflow(String wf) {
+ _record.setSimpleField(LAST_SCHEDULED_WORKFLOW, wf);
+ }
+
+ public String getLastScheduledSingleWorkflow() {
+ return _record.getSimpleField(LAST_SCHEDULED_WORKFLOW);
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index 2ea23c7..a59e818 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -21,12 +21,14 @@ package org.apache.helix.task.beans;
import java.util.List;
+import org.apache.helix.task.WorkflowConfig;
+
/**
* Bean class used for parsing workflow definitions from YAML.
*/
public class WorkflowBean {
public String name;
- public String expiry;
public List<JobBean> jobs;
public ScheduleBean schedule;
+ public long expiry = WorkflowConfig.DEFAULT_EXPIRY;
}