You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/09/11 05:56:47 UTC

[1/2] helix git commit: Support Job Level Expiration

Repository: helix
Updated Branches:
  refs/heads/master 50d65774f -> 78ed261e7


Support Job Level Expiration

This request comes from issue JobQueue is full with final stage jobs. We provided an API for cleaning up final stage jobs. This rb will do an clean up through Helix Controller, which is a proper approach.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/361fdc9c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/361fdc9c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/361fdc9c

Branch: refs/heads/master
Commit: 361fdc9c44beaae894fe102386ae029b71068a16
Parents: 50d6577
Author: Junkai Xue <jx...@linkedin.com>
Authored: Fri Aug 25 12:13:37 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Fri Aug 25 12:16:14 2017 -0700

----------------------------------------------------------------------
 .../rebalancer/util/RebalanceScheduler.java     |  2 +-
 .../org/apache/helix/task/JobRebalancer.java    |  1 +
 .../org/apache/helix/task/TaskRebalancer.java   | 14 +++
 .../apache/helix/task/WorkflowRebalancer.java   | 37 +++++++-
 .../helix/integration/task/TaskTestUtil.java    |  3 +-
 .../apache/helix/task/TestCleanExpiredJobs.java | 92 ++++++++++++++++++++
 6 files changed, 146 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/361fdc9c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
index 641c755..4721e9b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
@@ -84,7 +84,7 @@ public class RebalanceScheduler {
    */
   public long getRebalanceTime(String resource) {
     ScheduledTask task = _rebalanceTasks.get(resource);
-    if (task != null) {
+    if (task != null && !task.getFuture().isDone()) {
       return task.getStartTime();
     }
     return -1;

http://git-wip-us.apache.org/repos/asf/helix/blob/361fdc9c/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 5f2bc57..c8deb35 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -520,6 +520,7 @@ public class JobRebalancer extends TaskRebalancer {
     if (isWorkflowFinished(workflowContext, workflowConfig)) {
       workflowContext.setFinishTime(currentTime);
     }
+    scheduleJobCleanUp(jobName, workflowConfig, currentTime);
   }
 
   private void scheduleForNextTask(String job, JobContext jobCtx, long now) {

http://git-wip-us.apache.org/repos/asf/helix/blob/361fdc9c/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 27741ca..569e789 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -234,6 +234,20 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     if (isWorkflowFinished(workflowContext, workflowConfig)) {
       workflowContext.setFinishTime(currentTime);
     }
+    scheduleJobCleanUp(jobName, workflowConfig, currentTime);
+  }
+
+  protected void scheduleJobCleanUp(String jobName, WorkflowConfig workflowConfig,
+      long currentTime) {
+    JobConfig jobConfig = TaskUtil.getJobCfg(_manager, jobName);
+    long currentScheduledTime =
+        _scheduledRebalancer.getRebalanceTime(workflowConfig.getWorkflowId()) == -1
+            ? Long.MAX_VALUE
+            : _scheduledRebalancer.getRebalanceTime(workflowConfig.getWorkflowId());
+    if (currentTime + jobConfig.getExpiry() < currentScheduledTime) {
+      _scheduledRebalancer.scheduleRebalance(_manager, workflowConfig.getWorkflowId(),
+          currentTime + jobConfig.getExpiry());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/361fdc9c/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 01b3f6a..11c6a61 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -120,6 +120,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
       LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
     }
 
+    cleanExpiredJobs(workflowCfg, workflowCtx);
+
     TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
     return buildEmptyAssignment(workflow, currStateOutput);
   }
@@ -186,7 +188,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
         }
       }
     }
-    if (timeToSchedule < Long.MAX_VALUE) {
+    long currentScheduledTime = _scheduledRebalancer.getRebalanceTime(workflow) == -1
+        ? Long.MAX_VALUE
+        : _scheduledRebalancer.getRebalanceTime(workflow);
+    if (timeToSchedule < currentScheduledTime) {
       _scheduledRebalancer.scheduleRebalance(_manager, workflow, timeToSchedule);
     }
   }
@@ -309,6 +314,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
         long offsetMultiplier = (-delayFromStart) / period;
         long timeToSchedule = period * offsetMultiplier + startTime.getTime();
 
+
         // Now clone the workflow if this clone has not yet been created
         DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
         df.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -538,6 +544,35 @@ public class WorkflowRebalancer extends TaskRebalancer {
     _scheduledRebalancer.removeScheduledRebalance(job);
   }
 
+  private void cleanExpiredJobs(WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
+    if (workflowContext == null) {
+      return;
+    }
+
+    Map<String, TaskState> jobStates = workflowContext.getJobStates();
+    long newTimeToClean = Long.MAX_VALUE;
+    for (String job : workflowConfig.getJobDag().getAllNodes()) {
+      JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+      JobContext jobContext = TaskUtil.getJobContext(_manager, job);
+      // There is no ABORTED state for JobQueue Job. The job will die with workflow
+      if (jobContext != null && jobStates.containsKey(job) && (
+          jobStates.get(job) == TaskState.COMPLETED || jobStates.get(job) == TaskState.FAILED)) {
+        if (System.currentTimeMillis() >= jobConfig.getExpiry() + jobContext.getFinishTime()) {
+          cleanupJob(job, workflowConfig.getWorkflowId());
+        } else {
+          newTimeToClean =
+              Math.min(newTimeToClean, jobConfig.getExpiry() + jobContext.getFinishTime());
+        }
+      }
+    }
+
+    if (newTimeToClean < Long.MAX_VALUE && newTimeToClean < _scheduledRebalancer
+        .getRebalanceTime(workflowConfig.getWorkflowId())) {
+      _scheduledRebalancer
+          .scheduleRebalance(_manager, workflowConfig.getWorkflowId(), newTimeToClean);
+    }
+  }
+
   @Override
   public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
       CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {

http://git-wip-us.apache.org/repos/asf/helix/blob/361fdc9c/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index cdeebf4..d89533a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -246,9 +246,10 @@ public class TaskTestUtil {
     return workflowContext;
   }
 
-  public static JobContext buildJobContext(Long startTime, TaskPartitionState... partitionStates) {
+  public static JobContext buildJobContext(Long startTime, Long finishTime, TaskPartitionState... partitionStates) {
     JobContext jobContext = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
     jobContext.setStartTime(startTime == null ? System.currentTimeMillis() : startTime);
+    jobContext.setFinishTime(finishTime == null ? System.currentTimeMillis() : finishTime);
     int partitionId = 0;
     for (TaskPartitionState partitionState : partitionStates) {
       jobContext.setPartitionState(partitionId++, partitionState);

http://git-wip-us.apache.org/repos/asf/helix/blob/361fdc9c/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
new file mode 100644
index 0000000..16df022
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
@@ -0,0 +1,92 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
+  private ClusterDataCache _cache;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    super.beforeClass();
+  }
+
+  @Test
+  public void testCleanExpiredJobs() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
+
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < 5; i++) {
+      builder.enqueueJob("JOB" + i, jobBuilder);
+      TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(workflowName, "JOB" + i),
+          TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
+    }
+
+    WorkflowContext workflowContext = TaskTestUtil
+        .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.IN_PROGRESS, TaskState.NOT_STARTED);
+    _driver.start(builder.build());
+    _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
+    TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName);
+    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 3);
+  }
+
+  @Test void testNotCleanJobsDueToParentFail() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
+
+    long startTime = System.currentTimeMillis();
+    builder.enqueueJob("JOB0", jobBuilder);
+    builder.enqueueJob("JOB1", jobBuilder);
+    builder.addParentChildDependency("JOB0", "JOB1");
+    TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(workflowName, "JOB0"),
+        TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
+
+    WorkflowContext workflowContext = TaskTestUtil
+        .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.FAILED,
+            TaskState.FAILED);
+    _driver.start(builder.build());
+    _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
+    TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName);
+    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 1);
+  }
+}


[2/2] helix git commit: Disable ResourceMonitor for jobs

Posted by jx...@apache.org.
Disable ResourceMonitor for jobs


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/78ed261e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/78ed261e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/78ed261e

Branch: refs/heads/master
Commit: 78ed261e7afea59c4898ec28caa77f0569e3b5f2
Parents: 361fdc9
Author: Junkai Xue <jx...@linkedin.com>
Authored: Fri Aug 25 12:27:58 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Fri Aug 25 12:27:58 2017 -0700

----------------------------------------------------------------------
 .../org/apache/helix/controller/stages/ClusterDataCache.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/78ed261e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 89d483b..93555cd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -101,12 +101,14 @@ public class ClusterDataCache {
       _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
       _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
       _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
-      _resourceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
     }
     _idealStateMap = Maps.newHashMap(_idealStateCacheMap);
     _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
     _instanceConfigMap = Maps.newHashMap(_instanceConfigCacheMap);
-    _resourceConfigMap = Maps.newHashMap(_resourceConfigCacheMap);
+
+    // TODO: We should listen on resource config change instead of fetching every time
+    //       And add back resourceConfigCacheMap
+    _resourceConfigMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
 
     _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
     _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());