You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/05/25 01:19:51 UTC
[helix] 17/44: Task Framework code style change
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit a8e2cf7b76a802a5b005d32082699d4c9e39875a
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Mar 29 12:08:07 2019 -0700
Task Framework code style change
This diff includes style changes using Java 8 features.
RB=1613441
BUG=HELIX-1742
G=helix-reviewers
A=jxue
Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
.../apache/helix/task/AbstractTaskDispatcher.java | 110 ++++-----
.../java/org/apache/helix/task/JobDispatcher.java | 21 +-
.../java/org/apache/helix/task/TaskDriver.java | 263 ++++++++++-----------
.../org/apache/helix/task/WorkflowDispatcher.java | 25 +-
.../integration/task/TestQuotaBasedScheduling.java | 45 ++--
.../helix/integration/task/TestTaskRebalancer.java | 27 +--
6 files changed, 224 insertions(+), 267 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 78a7419..698730e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -79,7 +79,7 @@ public abstract class AbstractTaskDispatcher {
// Iterate through all instances
for (String instance : prevInstanceToTaskAssignments.keySet()) {
- assignedPartitions.put(instance, new HashSet<Integer>());
+ assignedPartitions.put(instance, new HashSet<>());
// Set all dropping transitions first. These are tasks coming from Participant disconnects
// that have some active current state (INIT or RUNNING) and the requestedState of DROPPED.
@@ -709,7 +709,7 @@ public abstract class AbstractTaskDispatcher {
private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
Set<Integer> excluded, Set<Integer> throttled, int n) {
- List<Integer> result = new ArrayList<Integer>();
+ List<Integer> result = new ArrayList<>();
for (Integer pId : candidatePartitions) {
if (!excluded.contains(pId)) {
if (result.size() < n) {
@@ -932,8 +932,8 @@ public abstract class AbstractTaskDispatcher {
TaskConfig taskConfig = taskEntry.getValue();
for (String assignableInstanceName : assignableInstanceManager
.getAssignableInstanceNames()) {
- assignableInstanceManager
- .release(assignableInstanceName, taskConfig, quotaType);
+ assignableInstanceManager.release(assignableInstanceName, taskConfig,
+ quotaType);
}
}
}
@@ -1120,9 +1120,9 @@ public abstract class AbstractTaskDispatcher {
}
if (!assignableInstanceManager.hasGlobalCapacity(quotaType)) {
- LOG.info(String
- .format("Job %s not ready to schedule due to not having enough quota for quota type %s",
- job, quotaType));
+ LOG.info(String.format(
+ "Job %s not ready to schedule due to not having enough quota for quota type %s", job,
+ quotaType));
return false;
}
@@ -1210,9 +1210,8 @@ public abstract class AbstractTaskDispatcher {
LOG.debug(
String.format("Finish job %s of workflow %s for runtime job DAG", jobName, workflowName));
} else {
- LOG.warn(String
- .format("Failed to find runtime job DAG for workflow %s and job %s", workflowName,
- jobName));
+ LOG.warn(String.format("Failed to find runtime job DAG for workflow %s and job %s",
+ workflowName, jobName));
}
}
@@ -1228,21 +1227,18 @@ public abstract class AbstractTaskDispatcher {
protected static void reportSubmissionToProcessDelay(BaseControllerDataProvider dataProvider,
final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
final JobConfig jobConfig, final long currentTimestamp) {
- AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() {
- @Override
- public Object call() {
- // Asynchronously update the appropriate JobMonitor
- JobMonitor jobMonitor = clusterStatusMonitor
- .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
- if (jobMonitor == null) {
- return null;
- }
-
- // Compute SubmissionToProcessDelay
- long submissionToProcessDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
- jobMonitor.updateSubmissionToProcessDelayGauge(submissionToProcessDelay);
+ AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> {
+ // Asynchronously update the appropriate JobMonitor
+ JobMonitor jobMonitor = clusterStatusMonitor
+ .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+ if (jobMonitor == null) {
return null;
}
+
+ // Compute SubmissionToProcessDelay
+ long submissionToProcessDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
+ jobMonitor.updateSubmissionToProcessDelayGauge(submissionToProcessDelay);
+ return null;
});
}
@@ -1258,21 +1254,18 @@ public abstract class AbstractTaskDispatcher {
private static void reportSubmissionToScheduleDelay(BaseControllerDataProvider dataProvider,
final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
final JobConfig jobConfig, final long currentTimestamp) {
- AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() {
- @Override
- public Object call() {
- // Asynchronously update the appropriate JobMonitor
- JobMonitor jobMonitor = clusterStatusMonitor
- .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
- if (jobMonitor == null) {
- return null;
- }
-
- // Compute SubmissionToScheduleDelay
- long submissionToStartDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
- jobMonitor.updateSubmissionToScheduleDelayGauge(submissionToStartDelay);
+ AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> {
+ // Asynchronously update the appropriate JobMonitor
+ JobMonitor jobMonitor = clusterStatusMonitor
+ .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+ if (jobMonitor == null) {
return null;
}
+
+ // Compute SubmissionToScheduleDelay
+ long submissionToStartDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
+ jobMonitor.updateSubmissionToScheduleDelayGauge(submissionToStartDelay);
+ return null;
});
}
@@ -1288,32 +1281,29 @@ public abstract class AbstractTaskDispatcher {
private static void reportControllerInducedDelay(BaseControllerDataProvider dataProvider,
final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
final JobConfig jobConfig, final long currentTimestamp) {
- AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() {
- @Override
- public Object call() {
- // Asynchronously update the appropriate JobMonitor
- JobMonitor jobMonitor = clusterStatusMonitor
- .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
- if (jobMonitor == null) {
- return null;
- }
-
- // Compute ControllerInducedDelay only if the workload is a test load
- // NOTE: this metric cannot be computed for general user-submitted workloads because
- // the actual runtime of the tasks vary, and there could exist multiple tasks per
- // job
- // NOTE: a test workload will have the "latency" field in the mapField of the
- // JobConfig (taskConfig)
- String firstTask = jobConfig.getTaskConfigMap().keySet().iterator().next();
- if (jobConfig.getTaskConfig(firstTask).getConfigMap().containsKey(TASK_LATENCY_TAG)) {
- long taskDuration =
- Long.valueOf(jobConfig.getTaskConfig(firstTask).getConfigMap().get(TASK_LATENCY_TAG));
- long controllerInducedDelay =
- currentTimestamp - jobConfig.getStat().getCreationTime() - taskDuration;
- jobMonitor.updateControllerInducedDelayGauge(controllerInducedDelay);
- }
+ AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> {
+ // Asynchronously update the appropriate JobMonitor
+ JobMonitor jobMonitor = clusterStatusMonitor
+ .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+ if (jobMonitor == null) {
return null;
}
+
+ // Compute ControllerInducedDelay only if the workload is a test load
+ // NOTE: this metric cannot be computed for general user-submitted workloads because
+ // the actual runtime of the tasks vary, and there could exist multiple tasks per
+ // job
+ // NOTE: a test workload will have the "latency" field in the mapField of the
+ // JobConfig (taskConfig)
+ String firstTask = jobConfig.getTaskConfigMap().keySet().iterator().next();
+ if (jobConfig.getTaskConfig(firstTask).getConfigMap().containsKey(TASK_LATENCY_TAG)) {
+ long taskDuration =
+ Long.valueOf(jobConfig.getTaskConfig(firstTask).getConfigMap().get(TASK_LATENCY_TAG));
+ long controllerInducedDelay =
+ currentTimestamp - jobConfig.getStat().getCreationTime() - taskDuration;
+ jobMonitor.updateControllerInducedDelayGauge(controllerInducedDelay);
+ }
+ return null;
});
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 56e04ac..c8cf09a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -29,10 +29,8 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.concurrent.Callable;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
@@ -157,8 +155,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
jobState = workflowCtx.getJobState(jobName);
workflowState = workflowCtx.getWorkflowState();
- if (INTERMEDIATE_STATES.contains(jobState) && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
- || TaskState.TIMED_OUT.equals(workflowState))) {
+ if (INTERMEDIATE_STATES.contains(jobState)
+ && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
+ || TaskState.TIMED_OUT.equals(workflowState))) {
jobState = TaskState.TIMING_OUT;
workflowCtx.setJobState(jobName, TaskState.TIMING_OUT);
} else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) {
@@ -272,7 +271,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name()));
}
Partition partition = new Partition(pName(jobResource, pId));
- Message pendingMessage = currStateOutput.getPendingMessage(jobResource, partition, instance);
+ Message pendingMessage =
+ currStateOutput.getPendingMessage(jobResource, partition, instance);
// While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT,
// so that Helix will cancel the transition.
if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) {
@@ -303,7 +303,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
// can be dropped(note that Helix doesn't track whether the drop is success or not).
if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) {
handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
- finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(), jobResource);
+ finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(),
+ jobResource);
return buildEmptyAssignment(jobResource, currStateOutput);
}
@@ -340,8 +341,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
TaskPartitionState state = jobContext.getPartitionState(pId);
Partition partition = new Partition(pName(jobResource, pId));
String instance = jobContext.getAssignedParticipant(pId);
- Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition,
- instance);
+ Message pendingMessage =
+ currentStateOutput.getPendingMessage(jobResource, partition, instance);
// If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished
if (state == TaskPartitionState.RUNNING
|| (state == TaskPartitionState.INIT && pendingMessage != null)) {
@@ -389,7 +390,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
Map<String, Set<Integer>> tasksToDrop) {
Map<String, SortedSet<Integer>> result = new HashMap<>();
for (String instance : liveInstances) {
- result.put(instance, new TreeSet<Integer>());
+ result.put(instance, new TreeSet<>());
}
// First, add all task partitions from JobContext
@@ -438,7 +439,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
// Check if this is a dropping transition
if (requestedState != null && requestedState.equals(TaskPartitionState.DROPPED.name())) {
if (!tasksToDrop.containsKey(instance)) {
- tasksToDrop.put(instance, new HashSet<Integer>());
+ tasksToDrop.put(instance, new HashSet<>());
}
tasksToDrop.get(instance).add(pId);
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 5f4ac14..2f0be85 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -71,7 +71,6 @@ public class TaskDriver {
/** For logging */
private static final Logger LOG = LoggerFactory.getLogger(TaskDriver.class);
-
/** Default time out for monitoring workflow or job state */
private final static int _defaultTimeout = 3 * 60 * 1000; /* 3 mins */
@@ -93,20 +92,21 @@ public class TaskDriver {
private final HelixAdmin _admin;
private final String _clusterName;
-
public TaskDriver(HelixManager manager) {
this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(),
manager.getHelixPropertyStore(), manager.getClusterName());
}
public TaskDriver(HelixZkClient client, String clusterName) {
- this(client, new ZkBaseDataAccessor<ZNRecord>(client), clusterName);
+ this(client, new ZkBaseDataAccessor<>(client), clusterName);
}
- public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
+ public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor,
+ String clusterName) {
this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
- new ZkHelixPropertyStore<>(baseAccessor,
- PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
+ new ZkHelixPropertyStore<>(baseAccessor, PropertyPathBuilder.propertyStore(clusterName),
+ null),
+ clusterName);
}
@Deprecated
@@ -123,9 +123,8 @@ public class TaskDriver {
_clusterName = clusterName;
}
-
- /** Schedules a new workflow
- *
+ /**
+ * Schedules a new workflow
* @param flow
*/
public void start(Workflow flow) {
@@ -137,7 +136,7 @@ public class TaskDriver {
WorkflowConfig newWorkflowConfig =
new WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build();
- Map<String, String> jobTypes = new HashMap<String, String>();
+ Map<String, String> jobTypes = new HashMap<>();
// add all job configs.
for (String job : flow.getJobConfigs().keySet()) {
JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
@@ -196,13 +195,12 @@ public class TaskDriver {
.setSimpleField(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name(), workflow);
}
if (workflow == null || !workflow.equals(newWorkflowConfig.getWorkflowId())) {
- throw new HelixException(String
- .format("Workflow name {%s} does not match the workflow Id from WorkflowConfig {%s}",
- workflow, newWorkflowConfig.getWorkflowId()));
+ throw new HelixException(String.format(
+ "Workflow name {%s} does not match the workflow Id from WorkflowConfig {%s}", workflow,
+ newWorkflowConfig.getWorkflowId()));
}
- WorkflowConfig currentConfig =
- TaskUtil.getWorkflowConfig(_accessor, workflow);
+ WorkflowConfig currentConfig = TaskUtil.getWorkflowConfig(_accessor, workflow);
if (currentConfig == null) {
throw new HelixException("Workflow " + workflow + " does not exist!");
}
@@ -223,7 +221,6 @@ public class TaskDriver {
/**
* Creates a new named job queue (workflow)
- *
* @param queue
*/
public void createQueue(JobQueue queue) {
@@ -233,7 +230,6 @@ public class TaskDriver {
/**
* Remove all completed or failed jobs in a job queue
* Same as {@link #cleanupQueue(String)}
- *
* @param queue name of the queue
* @throws Exception
*/
@@ -256,7 +252,8 @@ public class TaskDriver {
* the queue has to be stopped prior to this call
* @param queue queue name
* @param job job name, denamespaced
- * @param forceDelete
+ * @param forceDelete CAUTION: if set true, all job's related zk nodes will
+ * be clean up from zookeeper even if its workflow information can not be found.
*/
public void deleteJob(final String queue, final String job, boolean forceDelete) {
deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), forceDelete);
@@ -344,20 +341,17 @@ public class TaskDriver {
/**
* Adds a new job to the end an existing named queue.
- *
* @param queue
* @param job
* @param jobBuilder
* @throws Exception
*/
- public void enqueueJob(final String queue, final String job,
- JobConfig.Builder jobBuilder) {
+ public void enqueueJob(final String queue, final String job, JobConfig.Builder jobBuilder) {
enqueueJobs(queue, Collections.singletonList(job), Collections.singletonList(jobBuilder));
}
/**
* Batch add jobs to queues that garantee
- *
* @param queue
* @param jobs
* @param jobBuilders
@@ -365,7 +359,6 @@ public class TaskDriver {
public void enqueueJobs(final String queue, final List<String> jobs,
final List<JobConfig.Builder> jobBuilders) {
-
// Get the job queue config and capacity
WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig == null) {
@@ -419,78 +412,75 @@ public class TaskDriver {
}
// update the job dag to append the job to the end of the queue.
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- if (currentData == null) {
- // For some reason, the WorkflowConfig for this JobQueue doesn't exist
- // In this case, we cannot proceed and must alert the user
- throw new HelixException(
- String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", queue));
- }
+ DataUpdater<ZNRecord> updater = currentData -> {
+ if (currentData == null) {
+ // For some reason, the WorkflowConfig for this JobQueue doesn't exist
+ // In this case, we cannot proceed and must alert the user
+ throw new HelixException(
+ String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", queue));
+ }
+
+ // Add the node to the existing DAG
+ JobDag jobDag = JobDag
+ .fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
+ Set<String> allNodes = jobDag.getAllNodes();
+ if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
+ throw new IllegalStateException(
+ String.format("Queue %s already reaches its max capacity %f, failed to add %s", queue,
+ capacity, jobs.toString()));
+ }
- // Add the node to the existing DAG
- JobDag jobDag = JobDag.fromJson(
- currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
- Set<String> allNodes = jobDag.getAllNodes();
- if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
+ String lastNodeName = null;
+ for (int i = 0; i < namespacedJobNames.size(); i++) {
+ String namespacedJobName = namespacedJobNames.get(i);
+ if (allNodes.contains(namespacedJobName)) {
throw new IllegalStateException(String
- .format("Queue %s already reaches its max capacity %f, failed to add %s", queue,
- capacity, jobs.toString()));
+ .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i)));
}
-
- String lastNodeName = null;
- for (int i = 0; i < namespacedJobNames.size(); i++) {
- String namespacedJobName = namespacedJobNames.get(i);
- if (allNodes.contains(namespacedJobName)) {
- throw new IllegalStateException(String
- .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i)));
- }
- jobDag.addNode(namespacedJobName);
-
- // Add the node to the end of the queue
- String candidate = null;
- if (lastNodeName == null) {
- for (String node : allNodes) {
- if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
- candidate = node;
- break;
- }
+ jobDag.addNode(namespacedJobName);
+
+ // Add the node to the end of the queue
+ String candidate = null;
+ if (lastNodeName == null) {
+ for (String node : allNodes) {
+ if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
+ candidate = node;
+ break;
}
- } else {
- candidate = lastNodeName;
- }
- if (candidate != null) {
- jobDag.addParentToChild(candidate, namespacedJobName);
- lastNodeName = namespacedJobName;
}
+ } else {
+ candidate = lastNodeName;
+ }
+ if (candidate != null) {
+ jobDag.addParentToChild(candidate, namespacedJobName);
+ lastNodeName = namespacedJobName;
}
+ }
- // Add job type if job type is not null
- Map<String, String> jobTypes =
- currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
- for (String jobType : jobTypeList) {
- if (jobType != null) {
- if (jobTypes == null) {
- jobTypes = new HashMap<>();
- }
- jobTypes.put(queue, jobType);
+ // Add job type if job type is not null
+ Map<String, String> jobTypes =
+ currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
+ for (String jobType : jobTypeList) {
+ if (jobType != null) {
+ if (jobTypes == null) {
+ jobTypes = new HashMap<>();
}
+ jobTypes.put(queue, jobType);
}
+ }
- if (jobTypes != null) {
- currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
- }
- // Save the updated DAG
- try {
- currentData
- .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
- } catch (Exception e) {
- throw new IllegalStateException(
- String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e);
- }
- return currentData;
+ if (jobTypes != null) {
+ currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
}
+ // Save the updated DAG
+ try {
+ currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(),
+ jobDag.toJson());
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e);
+ }
+ return currentData;
};
String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
@@ -540,11 +530,11 @@ public class TaskDriver {
throw new IllegalStateException("Queue " + queue + " does not have a valid work state!");
}
- Set<String> jobs = new HashSet<String>();
+ Set<String> jobs = new HashSet<>();
for (String jobNode : workflowConfig.getJobDag().getAllNodes()) {
TaskState curState = wCtx.getJobState(jobNode);
- if (curState != null && (curState == TaskState.ABORTED || curState == TaskState.COMPLETED
- || curState == TaskState.FAILED)) {
+ if (curState != null && curState == TaskState.ABORTED || curState == TaskState.COMPLETED
+ || curState == TaskState.FAILED) {
jobs.add(jobNode);
}
}
@@ -558,8 +548,7 @@ public class TaskDriver {
_admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME);
IdealState is = buildWorkflowIdealState(workflow);
- TaskUtil
- .createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
+ TaskUtil.createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
_admin.setResourceIdealState(_clusterName, workflow, is);
}
@@ -576,12 +565,12 @@ public class TaskDriver {
private IdealState buildWorkflowIdealState(String workflow) {
CustomModeISBuilder IsBuilder = new CustomModeISBuilder(workflow);
- IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK).setNumReplica(1)
- .setNumPartitions(1).setStateModel(TaskConstants.STATE_MODEL_NAME).disableExternalView();
+ IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK).setNumReplica(1).setNumPartitions(1)
+ .setStateModel(TaskConstants.STATE_MODEL_NAME).disableExternalView();
IdealState is = IsBuilder.build();
- is.getRecord().setListField(workflow, new ArrayList<String>());
- is.getRecord().setMapField(workflow, new HashMap<String, String>());
+ is.getRecord().setListField(workflow, new ArrayList<>());
+ is.getRecord().setMapField(workflow, new HashMap<>());
is.setRebalancerClassName(WorkflowRebalancer.class.getName());
return is;
@@ -611,23 +600,19 @@ public class TaskDriver {
/**
* Public async method to stop a workflow/queue.
- *
* This call only send STOP command to Helix, it does not check
* whether the workflow (all jobs) has been stopped yet.
- *
* @param workflow
*/
- public void stop(String workflow) throws InterruptedException {
+ public void stop(String workflow) {
setWorkflowTargetState(workflow, TargetState.STOP);
}
/**
* Public sync method to stop a workflow/queue with timeout
- *
* Basically the workflow and all of its jobs has been stopped if this method return success.
- *
- * @param workflow The workflow name
- * @param timeout The timeout for stopping workflow/queue in milisecond
+ * @param workflow The workflow name
+ * @param timeout The timeout for stopping workflow/queue in milisecond
*/
public void waitToStop(String workflow, long timeout) throws InterruptedException {
setWorkflowTargetState(workflow, TargetState.STOP);
@@ -636,7 +621,8 @@ public class TaskDriver {
while (System.currentTimeMillis() <= endTime) {
WorkflowContext workflowContext = getWorkflowContext(workflow);
- if (workflowContext == null || TaskState.IN_PROGRESS.equals(workflowContext.getWorkflowState())) {
+ if (workflowContext == null
+ || TaskState.IN_PROGRESS.equals(workflowContext.getWorkflowState())) {
Thread.sleep(1000);
} else {
// Successfully stopped
@@ -651,7 +637,6 @@ public class TaskDriver {
/**
* Public method to delete a workflow/queue.
- *
* @param workflow
*/
public void delete(String workflow) {
@@ -660,11 +645,10 @@ public class TaskDriver {
/**
* Public method to delete a workflow/queue.
- *
* @param workflow
* @param forceDelete, CAUTION: if set true, workflow and all of its jobs' related zk nodes will
- * be clean up immediately from zookeeper, no matter whether there are jobs
- * are running or not.
+ * be clean up immediately from zookeeper, no matter whether there are jobs
+ * are running or not.
*/
public void delete(String workflow, boolean forceDelete) {
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
@@ -693,7 +677,8 @@ public class TaskDriver {
private void removeWorkflowFromZK(String workflow) {
Set<String> jobSet = new HashSet<>();
- // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove workflow
+ // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove
+ // workflow
WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, workflow);
if (wCfg != null) {
jobSet.addAll(wCfg.getJobDag().getAllNodes());
@@ -709,11 +694,11 @@ public class TaskDriver {
* Public synchronized method to wait for a delete operation to fully complete with timeout.
* When this method returns, it means that a queue (workflow) has been completely deleted, meaning
* its IdealState, WorkflowConfig, and WorkflowContext have all been deleted.
- *
* @param workflow workflow/jobqueue name
* @param timeout duration to give to delete operation to completion
*/
- public void deleteAndWaitForCompletion(String workflow, long timeout) throws InterruptedException {
+ public void deleteAndWaitForCompletion(String workflow, long timeout)
+ throws InterruptedException {
delete(workflow);
long endTime = System.currentTimeMillis() + timeout;
@@ -746,9 +731,11 @@ public class TaskDriver {
if (baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) {
failed.append("WorkflowContext ");
}
- throw new HelixException(String
- .format("Failed to delete the workflow/queue %s within %d milliseconds. "
- + "The following components still remain: %s", workflow, timeout, failed.toString()));
+ throw new HelixException(
+ String.format(
+ "Failed to delete the workflow/queue %s within %d milliseconds. "
+ + "The following components still remain: %s",
+ workflow, timeout, failed.toString()));
}
/**
@@ -780,30 +767,27 @@ public class TaskDriver {
}
WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, workflow);
- if (state != TargetState.DELETE && workflowContext != null &&
- workflowContext.getFinishTime() != WorkflowContext.UNFINISHED) {
+ if (state != TargetState.DELETE && workflowContext != null
+ && workflowContext.getFinishTime() != WorkflowContext.UNFINISHED) {
// Should not update target state for completed workflow
LOG.info("Workflow " + workflow + " is already completed, skip to update its target state "
+ state);
return;
}
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
- @Override public ZNRecord update(ZNRecord currentData) {
- if (currentData != null) {
- currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
- state.name());
- } else {
- LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is "
- + currentData);
- }
- return currentData;
+ DataUpdater<ZNRecord> updater = currentData -> {
+ if (currentData != null) {
+ currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
+ state.name());
+ } else {
+ LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is null.");
}
+ return currentData;
};
PropertyKey workflowConfigKey = TaskUtil.getWorkflowConfigKey(_accessor, workflow);
- _accessor.getBaseDataAccessor()
- .update(workflowConfigKey.getPath(), updater, AccessOption.PERSISTENT);
+ _accessor.getBaseDataAccessor().update(workflowConfigKey.getPath(), updater,
+ AccessOption.PERSISTENT);
RebalanceScheduler.invokeRebalance(_accessor, workflow);
}
@@ -841,11 +825,10 @@ public class TaskDriver {
/**
* Batch get the configurations of all workflows in this cluster.
- *
* @return
*/
public Map<String, WorkflowConfig> getWorkflows() {
- Map<String, WorkflowConfig> workflowConfigMap = new HashMap<String, WorkflowConfig>();
+ Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>();
Map<String, ResourceConfig> resourceConfigMap =
_accessor.getChildValuesMap(_accessor.keyBuilder().resourceConfigs());
@@ -865,7 +848,6 @@ public class TaskDriver {
* This call will be blocked until either workflow reaches to one of the state specified
* in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException
* Otherwise, it will return current workflow state
- *
* @param workflowName The workflow to be monitored
* @param timeout A long integer presents the time out, in milliseconds
* @param targetStates Specified states that user would like to stop monitoring
@@ -877,14 +859,15 @@ public class TaskDriver {
// Wait for completion.
long st = System.currentTimeMillis();
WorkflowContext ctx;
- Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
+ Set<TaskState> allowedStates = new HashSet<>(Arrays.asList(targetStates));
long timeToSleep = timeout > 100L ? 100L : timeout;
do {
Thread.sleep(timeToSleep);
ctx = getWorkflowContext(workflowName);
- } while ((ctx == null || ctx.getWorkflowState() == null || !allowedStates
- .contains(ctx.getWorkflowState())) && System.currentTimeMillis() < st + timeout);
+ } while ((ctx == null || ctx.getWorkflowState() == null
+ || !allowedStates.contains(ctx.getWorkflowState()))
+ && System.currentTimeMillis() < st + timeout);
if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
throw new HelixException(String.format(
@@ -900,7 +883,6 @@ public class TaskDriver {
* This is a wrapper function that set default time out for monitoring workflow in 2 MINUTES.
* If timeout happens, then it will throw a HelixException, Otherwise, it will return
* current job state.
- *
* @param workflowName The workflow to be monitored
* @param targetStates Specified states that user would like to stop monitoring
* @return A TaskState, which is current workflow state
@@ -915,7 +897,6 @@ public class TaskDriver {
* This call will be blocked until either specified job reaches to one of the state
* in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException
* Otherwise, it will return current job state
- *
* @param workflowName The workflow that contains the job to monitor
* @param jobName The specified job to monitor
* @param timeout A long integer presents the time out, in milliseconds
@@ -952,8 +933,9 @@ public class TaskDriver {
do {
Thread.sleep(timeToSleep);
ctx = getWorkflowContext(workflowName);
- } while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates
- .contains(ctx.getJobState(jobName))) && System.currentTimeMillis() < st + timeout);
+ } while ((ctx == null || ctx.getJobState(jobName) == null
+ || !allowedStates.contains(ctx.getJobState(jobName)))
+ && System.currentTimeMillis() < st + timeout);
if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) {
throw new HelixException(
@@ -968,7 +950,6 @@ public class TaskDriver {
* This is a wrapper function for monitoring job state with default timeout 2 MINUTES.
* If timeout happens, then it will throw a HelixException, Otherwise, it will return
* current job state
- *
* @param workflowName The workflow that contains the job to monitor
* @param jobName The specified job to monitor
* @param states Specified states that user would like to stop monitoring
@@ -981,12 +962,12 @@ public class TaskDriver {
}
/**
- * This function returns the timestamp of the very last task that was scheduled. It is provided to help determine
+ * This function returns the timestamp of the very last task that was scheduled. It is provided to
+ * help determine
* whether a given Workflow/Job/Task is stuck.
- *
* @param workflowName The name of the workflow
* @return timestamp of the most recent job scheduled.
- * -1L if timestamp is not set (either nothing is scheduled or no start time recorded).
+ * -1L if timestamp is not set (either nothing is scheduled or no start time recorded).
*/
public long getLastScheduledTaskTimestamp(String workflowName) {
return getLastScheduledTaskExecutionInfo(workflowName).getStartTimeStamp();
@@ -998,7 +979,6 @@ public class TaskDriver {
Integer taskPartitionIndex = null;
TaskPartitionState state = null;
-
WorkflowContext workflowContext = getWorkflowContext(workflowName);
if (workflowContext != null) {
Map<String, TaskState> allJobStates = workflowContext.getJobStates();
@@ -1035,9 +1015,8 @@ public class TaskDriver {
* @param taskName name of task. Optional if scope is WORKFLOW or JOB
* @return null if key-value pair not found or this content store does not exist. Otherwise,
* return a String
- *
* @deprecated use the following equivalents: {@link #getWorkflowUserContentMap(String)},
- * {@link #getJobUserContentMap(String, String)},
+ * {@link #getJobUserContentMap(String, String)},
* @{{@link #getTaskContentMap(String, String, String)}}
*/
@Deprecated
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 51b21eb..83a790f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -69,7 +69,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
// Split it into status update and assign. But there are couple of data need
// to pass around.
- public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) {
+ public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg,
+ WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput,
+ BestPossibleStateOutput bestPossibleOutput) {
// Fetch workflow configuration and context
if (workflowCfg == null) {
@@ -93,7 +95,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
// If timeout point has already been passed, it will not be scheduled
scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
- if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
+ if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())
+ && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
@@ -107,7 +110,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
// Step 3: handle workflow that should STOP
// For workflows that already reached final states, STOP should not take into effect
- if (!finalStates.contains(workflowCtx.getWorkflowState()) && TargetState.STOP.equals(targetState)) {
+ if (!finalStates.contains(workflowCtx.getWorkflowState())
+ && TargetState.STOP.equals(targetState)) {
LOG.info("Workflow " + workflow + "is marked as stopped.");
if (isWorkflowStopped(workflowCtx, workflowCfg)) {
workflowCtx.setWorkflowState(TaskState.STOPPED);
@@ -202,8 +206,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
- public WorkflowContext getOrInitializeWorkflowContext(
- String workflowName, TaskDataCache cache) {
+ public WorkflowContext getOrInitializeWorkflowContext(String workflowName, TaskDataCache cache) {
WorkflowContext workflowCtx = cache.getWorkflowContext(workflowName);
if (workflowCtx == null) {
workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
@@ -380,8 +383,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
jobIS = builder.build();
for (int i = 0; i < numPartitions; i++) {
- jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());
- jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>());
+ jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<>());
+ jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<>());
}
jobIS.setRebalancerClassName(JobRebalancer.class.getName());
admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS);
@@ -443,7 +446,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to start workflow " + newWorkflowName);
}
- if (lastScheduled == null || !newWorkflowName.equals(lastScheduled)) {
+ if (!newWorkflowName.equals(lastScheduled)) {
Workflow clonedWf =
cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
TaskDriver driver = new TaskDriver(_manager);
@@ -453,7 +456,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
driver.start(clonedWf);
} catch (Exception e) {
LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
- _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED);
+ _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(),
+ TaskState.FAILED);
}
}
// Persist workflow start regardless of success to avoid retrying and failing
@@ -582,7 +586,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
// and jobs will rescheduled again.
removeContextsAndPreviousAssignment(workflow, jobs, _clusterDataCache.getTaskDataCache());
}
- } else {
+ } else {
LOG.info("Did not clean up workflow " + workflow
+ " because neither the workflow is non-terminable nor is set to DELETE.");
}
@@ -598,5 +602,4 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
}
cache.removeContext(workflow);
}
-
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
index 6080399..1e06269 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
@@ -37,7 +37,6 @@ import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
@@ -59,8 +58,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
private static final String JOB_COMMAND = "DummyCommand";
private Map<String, String> _jobCommandMap;
private Map<String, Integer> _quotaTypeExecutionCount = new ConcurrentHashMap<>();
- private Set<String> _availableQuotaTypes =
- Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ private Set<String> _availableQuotaTypes = Collections.newSetFromMap(new ConcurrentHashMap<>());
private boolean _finishTask = false;
@BeforeClass
@@ -87,24 +85,9 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
// Set task callbacks
Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
- TaskFactory shortTaskFactory = new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new ShortTask(context, instanceName);
- }
- };
- TaskFactory longTaskFactory = new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new LongTask(context, instanceName);
- }
- };
- TaskFactory failTaskFactory = new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new FailTask(context, instanceName);
- }
- };
+ TaskFactory shortTaskFactory = context -> new ShortTask(context, instanceName);
+ TaskFactory longTaskFactory = context -> new LongTask(context, instanceName);
+ TaskFactory failTaskFactory = context -> new FailTask(context, instanceName);
taskFactoryReg.put("ShortTask", shortTaskFactory);
taskFactoryReg.put("LongTask", longTaskFactory);
taskFactoryReg.put("FailTask", failTaskFactory);
@@ -155,7 +138,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
for (int i = 0; i < 10; i++) {
List<TaskConfig> taskConfigs = new ArrayList<>();
- taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+ taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
.addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap);
workflowBuilder.addJob("JOB" + i, jobConfigBulider);
@@ -193,7 +176,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
for (int i = 0; i < 10; i++) {
List<TaskConfig> taskConfigs = new ArrayList<>();
- taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+ taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
JobConfig.Builder jobConfigBulider =
new JobConfig.Builder().setCommand(JOB_COMMAND).addTaskConfigs(taskConfigs)
.setJobCommandConfigMap(_jobCommandMap).setJobType("UNDEFINED");
@@ -231,7 +214,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
for (int i = 0; i < 5; i++) {
List<TaskConfig> taskConfigs = new ArrayList<>();
- taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+ taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
.addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("A");
workflowBuilder.addJob("JOB" + i, jobConfigBulider);
@@ -239,7 +222,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
for (int i = 5; i < 10; i++) {
List<TaskConfig> taskConfigs = new ArrayList<>();
- taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+ taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
.addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("B");
workflowBuilder.addJob("JOB" + i, jobConfigBulider);
@@ -283,8 +266,10 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
}
// Test that the next two are not executing
- JobContext context_2 = _driver.getJobContext("testQuotaConfigChange_2_testQuotaConfigChange_2_0");
- JobContext context_3 = _driver.getJobContext("testQuotaConfigChange_3_testQuotaConfigChange_3_0");
+ JobContext context_2 =
+ _driver.getJobContext("testQuotaConfigChange_2_testQuotaConfigChange_2_0");
+ JobContext context_3 =
+ _driver.getJobContext("testQuotaConfigChange_3_testQuotaConfigChange_3_0");
Assert.assertNull(context_2.getPartitionState(0));
Assert.assertNull(context_3.getPartitionState(0));
@@ -493,7 +478,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
for (int i = 0; i < numWorkflows; i++) {
String workflowName = workflowNames.get(i);
TaskState state = (i % 3 == 1) ? TaskState.FAILED : TaskState.COMPLETED;
- Assert.assertEquals(_driver.getWorkflowContext(_manager, workflowName).getWorkflowState(),
+ Assert.assertEquals(TaskDriver.getWorkflowContext(_manager, workflowName).getWorkflowState(),
state);
}
@@ -536,7 +521,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
// First run some jobs with quotaType A
List<TaskConfig> taskConfigs = new ArrayList<>();
- taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+ taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
.addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap).setJobType("A");
@@ -553,7 +538,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
// Run some jobs with quotaType B
// First run some jobs with quotaType A
taskConfigs = new ArrayList<>();
- taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+ taskConfigs.add(new TaskConfig("ShortTask", new HashMap<>()));
jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND).addTaskConfigs(taskConfigs)
.setJobCommandConfigMap(_jobCommandMap).setJobType("B");
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index e12b4a9..6de1d3f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -63,8 +63,7 @@ public class TestTaskRebalancer extends TaskTestBase {
JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
jobBuilder.setJobCommandConfigMap(commandConfig);
- Workflow flow = WorkflowGenerator
- .generateSingleJobWorkflowBuilder(jobName, jobBuilder)
+ Workflow flow = WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName, jobBuilder)
.setExpiry(expiry).build();
_driver.start(flow);
@@ -77,8 +76,8 @@ public class TestTaskRebalancer extends TaskTestBase {
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobName);
// Ensure context and config exist
- Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
- AccessOption.PERSISTENT));
+ Assert.assertTrue(
+ _manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT));
Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
// Wait for job to finish and expire
@@ -86,8 +85,8 @@ public class TestTaskRebalancer extends TaskTestBase {
Thread.sleep(expiry + 100);
// Ensure workflow config and context were cleaned up by now
- Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
- AccessOption.PERSISTENT));
+ Assert.assertFalse(
+ _manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT));
Assert.assertNull(accessor.getProperty(workflowCfgKey));
}
@@ -120,7 +119,8 @@ public class TestTaskRebalancer extends TaskTestBase {
}
}
- @Test public void partitionSet() throws Exception {
+ @Test
+ public void partitionSet() throws Exception {
final String jobResource = "partitionSet";
ImmutableList<String> targetPartitions =
ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13");
@@ -170,7 +170,8 @@ public class TestTaskRebalancer extends TaskTestBase {
}
}
- @Test public void timeouts() throws Exception {
+ @Test
+ public void timeouts() throws Exception {
final String jobResource = "timeouts";
JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
@@ -216,12 +217,10 @@ public class TestTaskRebalancer extends TaskTestBase {
// Enqueue jobs
Set<String> master = Sets.newHashSet("MASTER");
Set<String> slave = Sets.newHashSet("SLAVE");
- JobConfig.Builder job1 =
- new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
- JobConfig.Builder job2 =
- new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
+ JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+ JobConfig.Builder job2 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
_driver.enqueueJob(queueName, "masterJob", job1);
_driver.enqueueJob(queueName, "slaveJob", job2);