You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/11/02 21:26:31 UTC

[1/3] helix git commit: [HELIX-786] TEST: Make TestQuotaBasedScheduling stable

Repository: helix
Updated Branches:
  refs/heads/master ccf263c91 -> 59536d39c


[HELIX-786] TEST: Make TestQuotaBasedScheduling stable

Because recent changes caused the Controller to run slower, TestQuotaBasedScheduling was being unstable. This RB fixes this.
Changelist:
1. Use polling instead of Thread.sleep()


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bced0996
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bced0996
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bced0996

Branch: refs/heads/master
Commit: bced0996ed65c9a886b5e04788e2cc1c88fc37b1
Parents: ccf263c
Author: narendly <na...@gmail.com>
Authored: Fri Nov 2 14:02:02 2018 -0700
Committer: narendly <na...@gmail.com>
Committed: Fri Nov 2 14:02:02 2018 -0700

----------------------------------------------------------------------
 .../helix/integration/task/TestQuotaBasedScheduling.java     | 8 +-------
 1 file changed, 1 insertion(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/bced0996/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
index abbcf75..6080399 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
@@ -374,7 +374,6 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
 
     // Finish rest of the tasks
     _finishTask = true;
-    Thread.sleep(2000L);
   }
 
   /**
@@ -441,16 +440,13 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     Workflow secondWorkflow =
         createWorkflow("secondWorkflow", true, DEFAULT_QUOTA_TYPE, 1, 1, "ShortTask");
     _driver.start(secondWorkflow);
-    Thread.sleep(1000L); // Wait so that the Controller will try to process the workflow
 
     // At this point, secondWorkflow should still be in progress due to its task not being scheduled
     // due to thread pool saturation
-    TaskState secondWorkflowState = _driver.getWorkflowContext("secondWorkflow").getWorkflowState();
-    Assert.assertEquals(secondWorkflowState, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState("secondWorkflow", 2000L, TaskState.IN_PROGRESS);
 
     // Finish rest of the tasks
     _finishTask = true;
-    Thread.sleep(2000L);
   }
 
   /**
@@ -503,7 +499,6 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
 
     // Finish rest of the tasks
     _finishTask = true;
-    Thread.sleep(2000L);
   }
 
   /**
@@ -577,7 +572,6 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
 
     // Finish rest of the tasks
     _finishTask = true;
-    Thread.sleep(2000L);
   }
 
   /**


[3/3] helix git commit: [HELIX-788] HELIX: Fix DefaultPipeline so that it doesn't rebalance task resources

Posted by jx...@apache.org.
[HELIX-788] HELIX: Fix DefaultPipeline so that it doesn't rebalance task resources

Helix CHO testing indicated that the default pipeline was rebalancing task framework resources. This RB fixes this.
Changelist:
1. Change resourceMap to resourceToRebalance, which separates generic and task resources
2. Make logger use LogUtil to distinguish two pipelines


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/59536d39
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/59536d39
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/59536d39

Branch: refs/heads/master
Commit: 59536d39c85d3535408a40a46a1a60a4105ee6e4
Parents: dc25bac
Author: narendly <na...@gmail.com>
Authored: Fri Nov 2 14:19:16 2018 -0700
Committer: narendly <na...@gmail.com>
Committed: Fri Nov 2 14:19:16 2018 -0700

----------------------------------------------------------------------
 .../stages/IntermediateStateCalcStage.java      | 29 ++++++++++----------
 .../stages/ResourceComputationStage.java        |  2 +-
 .../stages/TestStateTransitionPrirority.java    |  3 +-
 3 files changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/59536d39/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 915a90f..9768bf7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -64,18 +64,19 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
 
     BestPossibleStateOutput bestPossibleStateOutput =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+    Map<String, Resource> resourceToRebalance =
+        event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
 
-    if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null
+    if (currentStateOutput == null || bestPossibleStateOutput == null || resourceToRebalance == null
         || cache == null) {
       throw new StageException(String.format("Missing attributes in event: %s. "
-              + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)",
-          event, currentStateOutput, bestPossibleStateOutput, resourceMap, cache));
+          + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)",
+          event, currentStateOutput, bestPossibleStateOutput, resourceToRebalance, cache));
     }
 
     IntermediateStateOutput intermediateStateOutput =
-        compute(event, resourceMap, currentStateOutput, bestPossibleStateOutput);
+        compute(event, resourceToRebalance, currentStateOutput, bestPossibleStateOutput);
     event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), intermediateStateOutput);
 
     // Make sure no instance has more replicas/partitions assigned than maxPartitionPerInstance. If
@@ -146,9 +147,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       String resourceName = resourcePriority.getResourceName();
 
       if (!bestPossibleStateOutput.containsResource(resourceName)) {
-        logger.warn(
-            "Skip calculating intermediate state for resource {} because the best possible state is not available.",
-            resourceName);
+        LogUtil.logInfo(logger, _eventId, String.format(
+            "Skip calculating intermediate state for resource %s because the best possible state is not available.",
+            resourceName));
         continue;
       }
 
@@ -228,8 +229,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
             instancePartitionCounts.put(instance, 0);
           }
           int partitionCount = instancePartitionCounts.get(instance); // Number of replicas (from
-          // different partitions) held
-          // in this instance
+                                                                      // different partitions) held
+                                                                      // in this instance
           partitionCount++;
           if (partitionCount > maxPartitionPerInstance) {
             HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
@@ -376,7 +377,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       // ErrorOrRecovery is set
       threshold = clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance();
       partitionCount += partitionsNeedRecovery.size(); // Only add this count when the threshold is
-      // set
+                                                       // set
     } else {
       if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) {
         // 0 is the default value so the old threshold has been set
@@ -736,8 +737,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       return RebalanceType.NONE; // No further action required
     } else {
       return RebalanceType.LOAD_BALANCE; // Required state counts are satisfied, but in order to
-      // achieve BestPossibleState, load balance may be required
-      // to shift replicas around
+                                         // achieve BestPossibleState, load balance may be required
+                                         // to shift replicas around
     }
   }
 
@@ -904,4 +905,4 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       return matchedState;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/59536d39/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 02a175a..15bae65 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -54,7 +54,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
 
     Map<String, IdealState> idealStates = cache.getIdealStates();
 
-    Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
+    Map<String, Resource> resourceMap = new LinkedHashMap<>();
     Map<String, Resource> resourceToRebalance = new LinkedHashMap<>();
 
     if (idealStates != null && idealStates.size() > 0) {

http://git-wip-us.apache.org/repos/asf/helix/blob/59536d39/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
index ef5139e..d4a6aaf 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
@@ -176,7 +176,8 @@ public class TestStateTransitionPrirority extends BaseStageTest {
 
     event.addAttribute(AttributeName.RESOURCES.name(),
         Collections.singletonMap(resourceName, resource));
-    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resource);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+        Collections.singletonMap(resourceName, resource));
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
     runStage(event, new ReadClusterDataStage());


[2/3] helix git commit: [HELIX-786] TASK: Fix stuck tasks after Participant connection loss

Posted by jx...@apache.org.
[HELIX-786] TASK: Fix stuck tasks after Participant connection loss

When Helix Participants lose ZK connection and enter a new ZK session, that causes all task partitions on those Participants to be reset into INIT state. This is undesirable because in reality, these tasks are considered dropped and should be scheduled on some other instance. This is the Controller side fix for this problem: when we detect tasks whose assigned Participants are no longer live, we mark them as DROPPED in their parent JobContext so that AssignableInstance will not consider them active when it is refreshed in the next pipeline. This enables these dropped tasks to be reassigned onto other instances.

Note that a Participant-side fix must follow so that upon reset() on task partitions, they should be in DROPPED state, not in INIT state. This does not inherently solve stuck INIT states on the original Participant. However, by letting these tasks be assigned on other instances, this fix lets jobs and workflows complete, upon which their CurrentStates will be dropped altogether.

Changelist:
1. Mark task partitions whose assigned Participants are no longer live as DROPPED in JobContext


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/dc25bac1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/dc25bac1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/dc25bac1

Branch: refs/heads/master
Commit: dc25bac1ebdcddb08aaab2765abfe72008b06a31
Parents: bced099
Author: narendly <na...@gmail.com>
Authored: Fri Nov 2 14:03:16 2018 -0700
Committer: narendly <na...@gmail.com>
Committed: Fri Nov 2 14:03:16 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/task/AbstractTaskDispatcher.java    | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/dc25bac1/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index cbf9fb8..cb721e5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -693,6 +693,8 @@ public abstract class AbstractTaskDispatcher {
       if (isTaskNotInTerminalState(state)) {
         String assignedParticipant = jobContext.getAssignedParticipant(partitionNumber);
         if (assignedParticipant != null && !liveInstances.contains(assignedParticipant)) {
+          // The assigned instance is no longer live, so mark it as DROPPED in the context
+          jobContext.setPartitionState(partitionNumber, TaskPartitionState.DROPPED);
           filteredTasks.add(partitionNumber);
         }
       }