You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:24 UTC
[28/33] helix git commit: Check whether instance is disable when
assigning tasks to instances in TaskRebalancer.
Check whether instance is disable when assigning tasks to instances in TaskRebalancer.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c3624e08
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c3624e08
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c3624e08
Branch: refs/heads/helix-0.6.x
Commit: c3624e082b4025945f1d7755c73ce999bcac6783
Parents: 7e9041f
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu May 5 15:51:58 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 16:18:25 2016 -0700
----------------------------------------------------------------------
.../FixedTargetTaskAssignmentCalculator.java | 18 +++++-
.../helix/integration/task/TaskTestBase.java | 15 ++++-
.../task/TestTaskWithInstanceDisabled.java | 58 ++++++++++++++++++++
3 files changed, 88 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c3624e08/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 60cd92f..0a2e8c5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -31,6 +31,7 @@ import java.util.TreeSet;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
@@ -65,7 +66,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
}
Set<String> tgtStates = jobCfg.getTargetPartitionStates();
return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
- jobContext);
+ jobContext, cache);
}
/**
@@ -130,7 +131,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
*/
private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
- Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
+ Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx, ClusterDataCache cache) {
Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
for (String instance : instances) {
result.put(instance, new TreeSet<Integer>());
@@ -151,6 +152,19 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
if (pendingMessage != null) {
continue;
}
+
+ InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance);
+
+ if (instanceConfig == null) {
+ LOG.error("Instance config not found for instance : " + instance);
+ continue;
+ }
+
+ if (!instanceConfig.getInstanceEnabled()) {
+ LOG.debug("Instance has been disabled, ignore instance : " + instance);
+ continue;
+ }
+
String s =
currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
instance);
http://git-wip-us.apache.org/repos/asf/helix/blob/c3624e08/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
index d6cf9bd..1bb72cc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
@@ -51,6 +51,7 @@ public class TaskTestBase extends ZkIntegrationTestBase {
protected int _numDbs = 1;
protected Boolean _partitionVary = true;
+ protected Boolean _instanceGroupTag = false;
protected ClusterControllerManager _controller;
@@ -76,6 +77,9 @@ public class TaskTestBase extends ZkIntegrationTestBase {
for (int i = 0; i < _numNodes; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
_setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ if (_instanceGroupTag) {
+ _setupTool.addInstanceTag(CLUSTER_NAME, storageNodeName, "TESTTAG" + i);
+ }
}
// Set up target db
@@ -90,7 +94,16 @@ public class TaskTestBase extends ZkIntegrationTestBase {
_testDbs.add(db);
}
} else {
- _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions, MASTER_SLAVE_STATE_MODEL);
+ if (_instanceGroupTag) {
+ _setupTool
+ .addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions,
+ "OnlineOffline", IdealState.RebalanceMode.FULL_AUTO.name());
+ IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
+ idealState.setInstanceGroupTag("TESTTAG0");
+ _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState);
+ } else {
+ _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions, MASTER_SLAVE_STATE_MODEL);
+ }
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numReplicas);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c3624e08/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
new file mode 100644
index 0000000..1c5bd36
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
@@ -0,0 +1,58 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestTaskWithInstanceDisabled extends TaskTestBase {
+ @Override
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _numDbs = 1;
+ _numNodes = 2;
+ _numParitions = 1;
+ _numReplicas = 1;
+ _partitionVary = false;
+ super.beforeClass();
+ }
+ @Test
+ public void testTaskWithInstanceDisabled() throws InterruptedException {
+ _setupTool.getClusterManagementTool()
+ .enableInstance(CLUSTER_NAME, PARTICIPANT_PREFIX + "_" + (_startPort + 0), false);
+ String jobResource = TestHelper.getTestMethodName();
+ JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB);
+ Workflow flow =
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
+ _driver.start(flow);
+
+ TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED, TaskState.FAILED);
+ JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
+ Assert.assertEquals(ctx.getAssignedParticipant(0), PARTICIPANT_PREFIX + "_" + (_startPort + 1));
+ }
+}