You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:00 UTC
[04/33] helix git commit: Support changes of workflow configuration.
Support changes of workflow configuration.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7a470703
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7a470703
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7a470703
Branch: refs/heads/helix-0.6.x
Commit: 7a47070362c536126e945138f10fb077215f706e
Parents: 3781929
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Jan 26 09:57:43 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:34:59 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/HelixException.java | 4 +
.../main/java/org/apache/helix/task/JobDag.java | 4 +-
.../java/org/apache/helix/task/TaskDriver.java | 133 ++++++++---
.../org/apache/helix/task/WorkflowConfig.java | 24 +-
.../helix/integration/task/TaskTestUtil.java | 36 +--
.../task/TestRunJobsWithMissingTarget.java | 9 +-
.../integration/task/TestUpdateWorkflow.java | 220 +++++++++++++++++++
7 files changed, 382 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/main/java/org/apache/helix/HelixException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixException.java b/helix-core/src/main/java/org/apache/helix/HelixException.java
index 8693026..26585ed 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixException.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixException.java
@@ -33,4 +33,8 @@ public class HelixException extends RuntimeException {
public HelixException(Throwable cause) {
super(cause);
}
+
+ public HelixException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index f708e91..73a5e58 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -19,12 +19,14 @@ package org.apache.helix.task;
* under the License.
*/
+import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
@@ -141,7 +143,7 @@ public class JobDag {
return ret;
}
- public String toJson() throws Exception {
+ public String toJson() throws IOException {
return new ObjectMapper().writeValueAsString(this);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index c4986ee..193526f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -42,6 +42,7 @@ import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
@@ -183,8 +184,11 @@ public class TaskDriver {
helixMgr.disconnect();
}
- /** Schedules a new workflow */
- public void start(Workflow flow) throws Exception {
+ /** Schedules a new workflow
+ *
+ * @param flow
+ */
+ public void start(Workflow flow) {
// TODO: check that namespace for workflow is available
LOG.info("Starting workflow " + flow.getName());
flow.validate();
@@ -206,14 +210,65 @@ public class TaskDriver {
addWorkflowResource(flow.getName());
}
- /** Creates a new named job queue (workflow) */
- public void createQueue(JobQueue queue) throws Exception {
+ /**
+ * Update the configuration of a non-terminable workflow (queue).
+ * The terminable workflow's configuration is not allowed
+ * to change once created.
+ * Note:
+ * For recurrent workflow, the current running schedule will not be effected,
+ * the new configuration will be applied to the next scheduled runs of the workflow.
+ * For non-recurrent workflow, the new configuration may (or may not) be applied
+ * on the current running jobs, but it will be applied on the following unscheduled jobs.
+ *
+ * Example:
+ *
+ * WorkflowConfig currentWorkflowConfig = TaskUtil.getWorkflowCfg(_manager, workflow);
+ * WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(currentWorkflowConfig);
+
+ * // make needed changes to the config here
+ * configBuilder.setXXX();
+ *
+ * // update workflow configuration
+ * _driver.updateWorkflow(workflow, configBuilder.build());
+ *
+ * @param workflow
+ * @param newWorkflowConfig
+ */
+ public void updateWorkflow(String workflow, WorkflowConfig newWorkflowConfig) {
+ WorkflowConfig currentConfig =
+ TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, workflow);
+ if (currentConfig == null) {
+ throw new HelixException("Workflow " + workflow + " does not exist!");
+ }
+
+ if (currentConfig.isTerminable()) {
+ throw new HelixException(
+ "Workflow " + workflow + " is terminable, not allow to change its configuration!");
+ }
+
+ _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, workflow),
+ newWorkflowConfig.getResourceConfigMap());
+
+ TaskUtil.invokeRebalance(_accessor, workflow);
+ }
+
+ /**
+ * Creates a new named job queue (workflow)
+ *
+ * @param queue
+ */
+ public void createQueue(JobQueue queue) {
start(queue);
}
- /** Flushes a named job queue */
+ /**
+ * Flushes a named job queue
+ *
+ * @param queueName
+ * @throws Exception
+ */
// TODO: need to make sure the queue is stopped or completed before flush the queue.
- public void flushQueue(String queueName) throws Exception {
+ public void flushQueue(String queueName) {
WorkflowConfig config =
TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
if (config == null) {
@@ -275,7 +330,13 @@ public class TaskDriver {
_propertyStore.update(path, updater, AccessOption.PERSISTENT);
}
- /** Delete a job from an existing named queue, the queue has to be stopped prior to this call */
+ /**
+ * Delete a job from an existing named queue,
+ * the queue has to be stopped prior to this call
+ *
+ * @param queueName
+ * @param jobName
+ */
public void deleteJob(final String queueName, final String jobName) {
WorkflowConfig workflowCfg =
TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
@@ -315,8 +376,12 @@ public class TaskDriver {
}
}
-
- /** delete a job from a scheduled (non-recurrent) queue.*/
+ /**
+ * delete a job from a scheduled (non-recurrent) queue.
+ *
+ * @param queueName
+ * @param jobName
+ */
private void deleteJobFromScheduledQueue(final String queueName, final String jobName) {
WorkflowConfig workflowCfg =
TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
@@ -354,9 +419,7 @@ public class TaskDriver {
_propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
}
- /**
- * Remove the job name from the DAG from the queue configuration
- */
+ /** Remove the job name from the DAG from the queue configuration */
private void removeJobFromDag(final String queueName, final String jobName) {
final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
@@ -410,8 +473,7 @@ public class TaskDriver {
}
}
- /** update queue's property to remove job from JOB_STATES if it is already started.
- */
+ /** update queue's property to remove job from JOB_STATES if it is already started. */
private void removeJobStateFromQueue(final String queueName, final String jobName) {
final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
String queuePropertyPath =
@@ -435,9 +497,16 @@ public class TaskDriver {
}
}
- /** Adds a new job to the end an existing named queue */
- public void enqueueJob(final String queueName, final String jobName, JobConfig.Builder jobBuilder)
- throws Exception {
+ /**
+ * Adds a new job to the end an existing named queue.
+ *
+ * @param queueName
+ * @param jobName
+ * @param jobBuilder
+ * @throws Exception
+ */
+ public void enqueueJob(final String queueName, final String jobName,
+ JobConfig.Builder jobBuilder) {
// Get the job queue config and capacity
HelixProperty workflowConfig =
_accessor.getProperty(_accessor.keyBuilder().resourceConfig(queueName));
@@ -468,12 +537,12 @@ public class TaskDriver {
JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
Set<String> allNodes = jobDag.getAllNodes();
if (allNodes.size() >= capacity) {
- throw new IllegalStateException("Queue " + queueName + " is at capacity, will not add "
- + jobName);
+ throw new IllegalStateException(
+ "Queue " + queueName + " is at capacity, will not add " + jobName);
}
if (allNodes.contains(namespacedJobName)) {
- throw new IllegalStateException("Could not add to queue " + queueName + ", job "
- + jobName + " already exists");
+ throw new IllegalStateException(
+ "Could not add to queue " + queueName + ", job " + jobName + " already exists");
}
jobDag.addNode(namespacedJobName);
@@ -493,8 +562,8 @@ public class TaskDriver {
try {
currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
} catch (Exception e) {
- throw new IllegalStateException(
- "Could not add job " + jobName + " to queue " + queueName, e);
+ throw new IllegalStateException("Could not add job " + jobName + " to queue " + queueName,
+ e);
}
return currentData;
}
@@ -550,17 +619,29 @@ public class TaskDriver {
}
}
- /** Public method to resume a workflow/queue */
+ /**
+ * Public method to resume a workflow/queue.
+ *
+ * @param workflow
+ */
public void resume(String workflow) {
setWorkflowTargetState(workflow, TargetState.START);
}
- /** Public method to stop a workflow/queue */
+ /**
+ * Public method to stop a workflow/queue.
+ *
+ * @param workflow
+ */
public void stop(String workflow) {
setWorkflowTargetState(workflow, TargetState.STOP);
}
- /** Public method to delete a workflow/queue */
+ /**
+ * Public method to delete a workflow/queue.
+ *
+ * @param workflow
+ */
public void delete(String workflow) {
setWorkflowTargetState(workflow, TargetState.DELETE);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 56fba58..4c81654 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -18,13 +18,14 @@ package org.apache.helix.task;
* specific language governing permissions and limitations
* under the License.
*/
-
+import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
+import org.apache.helix.HelixException;
/**
* Provides a typed interface to workflow level configurations. Validates the configurations.
@@ -44,7 +45,9 @@ public class WorkflowConfig {
public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
/* Member variables */
+ // TODO: jobDag should not be in the workflowConfig.
private final JobDag _jobDag;
+
// _parallelJobs would kind of break the job dependency,
// e.g: if job1 -> job2, but _parallelJobs == 2,
// then job1 and job2 could be scheduled at the same time
@@ -114,9 +117,13 @@ public class WorkflowConfig {
return _scheduleConfig.getStartTime();
}
- public Map<String, String> getResourceConfigMap() throws Exception {
+ public Map<String, String> getResourceConfigMap() {
Map<String, String> cfgMap = new HashMap<String, String>();
- cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
+ try {
+ cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
+ } catch (IOException ex) {
+ throw new HelixException("Invalid job dag configuration!", ex);
+ }
cfgMap.put(WorkflowConfig.PARALLEL_JOBS, String.valueOf(getParallelJobs()));
cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(getExpiry()));
cfgMap.put(WorkflowConfig.TARGET_STATE, getTargetState().name());
@@ -153,6 +160,17 @@ public class WorkflowConfig {
return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _isTerminable, _scheduleConfig);
}
+ public Builder() {}
+
+ public Builder(WorkflowConfig workflowConfig) {
+ _taskDag = workflowConfig.getJobDag();
+ _parallelJobs = workflowConfig.getParallelJobs();
+ _targetState = workflowConfig.getTargetState();
+ _expiry = workflowConfig.getExpiry();
+ _isTerminable = workflowConfig.isTerminable();
+ _scheduleConfig = workflowConfig.getScheduleConfig();
+ }
+
public Builder setJobDag(JobDag v) {
_taskDag = v;
return this;
http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 11677b8..06b9751 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -49,22 +49,26 @@ public class TaskTestUtil {
* Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is
* reached.
* If the task has not reached target state by then, an error is thrown
+ *
* @param workflowResource Resource to poll for completeness
* @throws InterruptedException
*/
public static void pollForWorkflowState(HelixManager manager, String workflowResource,
- TaskState state) throws InterruptedException {
+ TaskState... targetStates) throws InterruptedException {
// Wait for completion.
long st = System.currentTimeMillis();
WorkflowContext ctx;
+ Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
do {
Thread.sleep(100);
ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
- } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state)
- && System.currentTimeMillis() < st + _default_timeout);
+ } while ((ctx == null || ctx.getWorkflowState() == null || !allowedStates
+ .contains(ctx.getWorkflowState())) && System.currentTimeMillis() < st + _default_timeout);
Assert.assertNotNull(ctx);
- Assert.assertEquals(ctx.getWorkflowState(), state);
+ TaskState workflowState = ctx.getWorkflowState();
+ Assert.assertTrue(allowedStates.contains(workflowState),
+ "expect workflow states: " + allowedStates + " actual workflow state: " + workflowState);
}
/**
@@ -101,10 +105,13 @@ public class TaskTestUtil {
Thread.sleep(100);
ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
}
- while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(ctx.getJobState(jobName)))
+ while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(
+ ctx.getJobState(jobName)))
&& System.currentTimeMillis() < st + _default_timeout);
- Assert.assertNotNull(ctx);
- Assert.assertTrue(allowedStates.contains(ctx.getJobState(jobName)));
+ Assert.assertNotNull(ctx, "Empty job context");
+ TaskState jobState = ctx.getJobState(jobName);
+ Assert.assertTrue(allowedStates.contains(jobState),
+ "expect job states: " + allowedStates + " actual job state: " + jobState);
}
public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
@@ -127,8 +134,8 @@ public class TaskTestUtil {
long st = System.currentTimeMillis();
WorkflowContext ctx;
do {
- Thread.sleep(100);
ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+ Thread.sleep(100);
} while (ctx == null && System.currentTimeMillis() < st + _default_timeout);
Assert.assertNotNull(ctx);
return ctx;
@@ -228,9 +235,14 @@ public class TaskTestUtil {
}
public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
+ return buildRecurrentJobQueue(jobQueueName, delayStart, 60);
+ }
+
+ public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart,
+ int recurrenInSeconds) {
Map<String, String> cfgMap = new HashMap<String, String>();
cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
- cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
+ cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(recurrenInSeconds));
cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
Calendar cal = Calendar.getInstance();
cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
@@ -238,8 +250,6 @@ public class TaskTestUtil {
cal.set(Calendar.MILLISECOND, 0);
cfgMap.put(WorkflowConfig.START_TIME,
WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
- //cfgMap.put(WorkflowConfig.START_TIME,
- //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
}
@@ -262,8 +272,4 @@ public class TaskTestUtil {
public static JobQueue.Builder buildJobQueue(String jobQueueName) {
return buildJobQueue(jobQueueName, 0);
}
-
- public static boolean pollForParticipantParallelState() {
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
index 31e4325..9fd7735 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -68,7 +68,8 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
private HelixManager _manager;
private TaskDriver _driver;
- @BeforeClass public void beforeClass() throws Exception {
+ @BeforeClass
+ public void beforeClass() throws Exception {
String namespace = "/" + CLUSTER_NAME;
if (_gZkClient.exists(namespace)) {
_gZkClient.deleteRecursive(namespace);
@@ -141,7 +142,8 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
}
}
- @Test public void testJobFailsWithMissingTarget() throws Exception {
+ @Test
+ public void testJobFailsWithMissingTarget() throws Exception {
String queueName = TestHelper.getTestMethodName();
// Create a queue
@@ -166,7 +168,8 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED);
}
- @Test public void testJobFailsWithMissingTargetInRunning() throws Exception {
+ @Test
+ public void testJobFailsWithMissingTargetInRunning() throws Exception {
String queueName = TestHelper.getTestMethodName();
// Create a queue
http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
new file mode 100644
index 0000000..fc93392
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
@@ -0,0 +1,220 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.ScheduleConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class TestUpdateWorkflow extends ZkIntegrationTestBase {
+ private static final Logger LOG = Logger.getLogger(TestUpdateWorkflow.class);
+ private static final int n = 5;
+ private static final int START_PORT = 12918;
+ private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+ private static final String TIMEOUT_CONFIG = "Timeout";
+ private static final String TGT_DB = "TestDB";
+ private static final int NUM_PARTITIONS = 20;
+ private static final int NUM_REPLICAS = 3;
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+ private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+ private ClusterControllerManager _controller;
+
+ private HelixManager _manager;
+ private TaskDriver _driver;
+ private ZKHelixDataAccessor _accessor;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
+ }
+
+ _accessor =
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ setupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < n; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+
+ // Set up target db
+ setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
+ setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
+
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+ taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new MockTask(context);
+ }
+ });
+
+ // start dummy participants
+ for (int i = 0; i < n; i++) {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+ taskFactoryReg));
+
+ _participants[i].syncStart();
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // create cluster manager
+ _manager =
+ HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+ ZK_ADDR);
+ _manager.connect();
+
+ _driver = new TaskDriver(_manager);
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
+ ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ _manager.disconnect();
+ _controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ _participants[i].syncStop();
+ }
+ }
+
+ @Test
+ public void testUpdateQueueConfig() throws InterruptedException {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000);
+ // Create and Enqueue jobs
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i <= 1; i++) {
+ String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+ String jobName = targetPartition.toLowerCase() + "Job" + i;
+ queueBuild.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+
+ _driver.start(queueBuild.build());
+
+ WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(_manager, queueName);
+ WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowConfig);
+
+ Calendar startTime = Calendar.getInstance();
+ startTime.set(Calendar.SECOND, startTime.get(Calendar.SECOND) + 1);
+
+ ScheduleConfig scheduleConfig =
+ ScheduleConfig.recurringFromDate(startTime.getTime(), TimeUnit.MINUTES, 2);
+
+ configBuilder.setScheduleConfig(scheduleConfig);
+
+ // ensure current schedule is started
+ String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+ TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.IN_PROGRESS);
+
+ _driver.updateWorkflow(queueName, configBuilder.build());
+
+ // ensure current schedule is completed
+ TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.COMPLETED);
+
+ Thread.sleep(1000);
+
+ wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+ scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+ WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, scheduledQueue);
+
+ Calendar configStartTime = Calendar.getInstance();
+ configStartTime.setTime(wCfg.getStartTime());
+
+ Assert.assertTrue(
+ (startTime.get(Calendar.HOUR_OF_DAY) == configStartTime.get(Calendar.HOUR_OF_DAY) &&
+ startTime.get(Calendar.MINUTE) == configStartTime.get(Calendar.MINUTE) &&
+ startTime.get(Calendar.SECOND) == configStartTime.get(Calendar.SECOND)));
+ }
+}
+