You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:24 UTC

[28/33] helix git commit: Check whether instance is disable when assigning tasks to instances in TaskRebalancer.

Check whether instance is disable when assigning tasks to instances in TaskRebalancer.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c3624e08
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c3624e08
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c3624e08

Branch: refs/heads/helix-0.6.x
Commit: c3624e082b4025945f1d7755c73ce999bcac6783
Parents: 7e9041f
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu May 5 15:51:58 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 16:18:25 2016 -0700

----------------------------------------------------------------------
 .../FixedTargetTaskAssignmentCalculator.java    | 18 +++++-
 .../helix/integration/task/TaskTestBase.java    | 15 ++++-
 .../task/TestTaskWithInstanceDisabled.java      | 58 ++++++++++++++++++++
 3 files changed, 88 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c3624e08/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 60cd92f..0a2e8c5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -31,6 +31,7 @@ import java.util.TreeSet;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
@@ -65,7 +66,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
     }
     Set<String> tgtStates = jobCfg.getTargetPartitionStates();
     return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
-        jobContext);
+        jobContext, cache);
   }
 
   /**
@@ -130,7 +131,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
    */
   private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
       CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
-      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
+      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx, ClusterDataCache cache) {
     Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
     for (String instance : instances) {
       result.put(instance, new TreeSet<Integer>());
@@ -151,6 +152,19 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
           if (pendingMessage != null) {
             continue;
           }
+
+          InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance);
+
+          if (instanceConfig == null) {
+            LOG.error("Instance config not found for instance : " + instance);
+            continue;
+          }
+
+          if (!instanceConfig.getInstanceEnabled()) {
+            LOG.debug("Instance has been disabled, ignore instance : " + instance);
+            continue;
+          }
+
           String s =
               currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
                   instance);

http://git-wip-us.apache.org/repos/asf/helix/blob/c3624e08/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
index d6cf9bd..1bb72cc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
@@ -51,6 +51,7 @@ public class TaskTestBase extends ZkIntegrationTestBase {
   protected int _numDbs = 1;
 
   protected Boolean _partitionVary = true;
+  protected Boolean _instanceGroupTag = false;
 
   protected ClusterControllerManager _controller;
 
@@ -76,6 +77,9 @@ public class TaskTestBase extends ZkIntegrationTestBase {
     for (int i = 0; i < _numNodes; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      if (_instanceGroupTag) {
+        _setupTool.addInstanceTag(CLUSTER_NAME, storageNodeName, "TESTTAG" + i);
+      }
     }
 
     // Set up target db
@@ -90,7 +94,16 @@ public class TaskTestBase extends ZkIntegrationTestBase {
         _testDbs.add(db);
       }
     } else {
-      _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions, MASTER_SLAVE_STATE_MODEL);
+      if (_instanceGroupTag) {
+        _setupTool
+            .addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions,
+                "OnlineOffline", IdealState.RebalanceMode.FULL_AUTO.name());
+        IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
+        idealState.setInstanceGroupTag("TESTTAG0");
+        _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState);
+      } else {
+        _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions, MASTER_SLAVE_STATE_MODEL);
+      }
       _setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numReplicas);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c3624e08/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
new file mode 100644
index 0000000..1c5bd36
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
@@ -0,0 +1,58 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestTaskWithInstanceDisabled extends TaskTestBase {
+  @Override
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numDbs = 1;
+    _numNodes = 2;
+    _numParitions = 1;
+    _numReplicas = 1;
+    _partitionVary = false;
+    super.beforeClass();
+  }
+  @Test
+  public void testTaskWithInstanceDisabled() throws InterruptedException {
+    _setupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, PARTICIPANT_PREFIX + "_" + (_startPort + 0), false);
+    String jobResource = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB);
+    Workflow flow =
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
+    _driver.start(flow);
+
+    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED, TaskState.FAILED);
+    JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
+    Assert.assertEquals(ctx.getAssignedParticipant(0), PARTICIPANT_PREFIX + "_" + (_startPort + 1));
+  }
+}