You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/06/22 22:57:37 UTC

[18/50] [abbrv] helix git commit: Add methods for creating WorkflowContext and JobContext for integration test

Add methods for creating WorkflowContext and JobContext for integration test

Add methods for creating WorkflowContext and JobContext for integration test


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/17c923fe
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/17c923fe
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/17c923fe

Branch: refs/heads/master
Commit: 17c923fe82973e7ba9d86fb306737b39f3c97c6a
Parents: 812b83f
Author: Junkai Xue <jx...@linkedin.com>
Authored: Fri Feb 17 16:51:40 2017 -0800
Committer: dasahcc <ju...@gmail.com>
Committed: Sun Mar 12 12:16:34 2017 -0700

----------------------------------------------------------------------
 .../helix/task/DeprecatedTaskRebalancer.java    |  4 +--
 .../org/apache/helix/task/JobRebalancer.java    |  2 +-
 .../java/org/apache/helix/task/TaskUtil.java    |  2 ++
 .../apache/helix/task/WorkflowRebalancer.java   |  2 +-
 .../helix/integration/task/TaskTestUtil.java    | 27 ++++++++++++++++++++
 5 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/17c923fe/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 0a43c0b..14c559c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -142,7 +142,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
 
     // Initialize workflow context if needed
     if (workflowCtx == null) {
-      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
+      workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
       workflowCtx.setStartTime(System.currentTimeMillis());
       LOG.info("Workflow context for " + resourceName + " created!");
     }
@@ -188,7 +188,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
     // Fetch any existing context information from the property store.
     JobContext jobCtx = TaskUtil.getJobContext(_manager, resourceName);
     if (jobCtx == null) {
-      jobCtx = new JobContext(new ZNRecord("TaskContext"));
+      jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
       jobCtx.setStartTime(System.currentTimeMillis());
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/17c923fe/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index bed81cd..dc96351 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -122,7 +122,7 @@ public class JobRebalancer extends TaskRebalancer {
     // Fetch any existing context information from the property store.
     JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName);
     if (jobCtx == null) {
-      jobCtx = new JobContext(new ZNRecord("TaskContext"));
+      jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
       jobCtx.setStartTime(System.currentTimeMillis());
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/17c923fe/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 9d69083..effdd44 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -51,6 +51,8 @@ public class TaskUtil {
   private static final Logger LOG = Logger.getLogger(TaskUtil.class);
   public static final String CONTEXT_NODE = "Context";
   public static final String USER_CONTENT_NODE = "UserContent";
+  public static final String WORKFLOW_CONTEXT_KW = "WorkflowContext";
+  public static final String TASK_CONTEXT_KW = "TaskContext";
 
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} object.

http://git-wip-us.apache.org/repos/asf/helix/blob/17c923fe/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 36324d8..01b3f6a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -61,7 +61,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflow);
     // Initialize workflow context if needed
     if (workflowCtx == null) {
-      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
+      workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
       workflowCtx.setStartTime(System.currentTimeMillis());
       LOG.debug("Workflow context is created for " + workflow);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/17c923fe/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 6122463..2a22b90 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.ScheduleConfig;
@@ -35,14 +36,17 @@ import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.task.WorkflowRebalancer;
 import org.testng.Assert;
 
 /**
  * Static test utility methods.
  */
 public class TaskTestUtil {
+  public static final String JOB_KW = "JOB";
   private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
 
   public static void pollForEmptyJobState(final TaskDriver driver, final String workflowName,
@@ -216,4 +220,27 @@ public class TaskTestUtil {
   public static JobQueue.Builder buildJobQueue(String jobQueueName) {
     return buildJobQueue(jobQueueName, 0, 0);
   }
+
+  public static WorkflowContext buildWorkflowContext(TaskState workflowState, Long startTime,
+      TaskState... jobStates) {
+    WorkflowContext workflowContext =
+        new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
+    workflowContext.setStartTime(startTime == null ? System.currentTimeMillis() : startTime);
+    int jobId = 0;
+    for (TaskState jobstate : jobStates) {
+      workflowContext.setJobState(JOB_KW + jobId++, jobstate);
+    }
+    workflowContext.setWorkflowState(workflowState);
+    return workflowContext;
+  }
+
+  public static JobContext buildJobContext(Long startTime, TaskPartitionState... partitionStates) {
+    JobContext jobContext = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
+    jobContext.setStartTime(startTime == null ? System.currentTimeMillis() : startTime);
+    int partitionId = 0;
+    for (TaskPartitionState partitionState : partitionStates) {
+      jobContext.setPartitionState(partitionId++, partitionState);
+    }
+    return jobContext;
+  }
 }