You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/03/17 17:35:50 UTC
[helix] branch zooscalability updated: Add integration tests for
Helix Java APIs (#892)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/zooscalability by this push:
new 1faaaca Add integration tests for Helix Java APIs (#892)
1faaaca is described below
commit 1faaaca379459eeaafcb49770582a3a731ad0439
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Tue Mar 17 10:35:43 2020 -0700
Add integration tests for Helix Java APIs (#892)
This commit adds a comprehensive integration test for Helix Java APIs. All Helix Java APIs are tested using regular resource rebalancing and task framework.
---
.../helix/manager/zk/ZkCacheBaseDataAccessor.java | 30 +-
.../java/org/apache/helix/task/TaskDriver.java | 6 +-
.../BestPossibleExternalViewVerifier.java | 4 +-
.../StrictMatchExternalViewVerifier.java | 4 +-
.../src/test/java/org/apache/helix/TestHelper.java | 5 +-
.../multizk/TestMultiZkHelixJavaApis.java | 476 +++++++++++++++++++++
.../helix/integration/task/WorkflowGenerator.java | 27 +-
7 files changed, 528 insertions(+), 24 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index bd05ea7..72d5601 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -40,6 +40,7 @@ import org.apache.helix.store.zk.ZNode;
import org.apache.helix.util.PathUtils;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
@@ -919,7 +920,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
}
}
- public static class Builder {
+ public static class Builder<T> {
private String _zkAddress;
private RealmAwareZkClient.RealmMode _realmMode;
private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
@@ -934,39 +935,39 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
public Builder() {
}
- public Builder setZkAddress(String zkAddress) {
+ public Builder<T> setZkAddress(String zkAddress) {
_zkAddress = zkAddress;
return this;
}
- public Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+ public Builder<T> setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
_realmMode = realmMode;
return this;
}
- public Builder setRealmAwareZkConnectionConfig(
+ public Builder<T> setRealmAwareZkConnectionConfig(
RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
_realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
return this;
}
- public Builder setRealmAwareZkClientConfig(
+ public Builder<T> setRealmAwareZkClientConfig(
RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
_realmAwareZkClientConfig = realmAwareZkClientConfig;
return this;
}
- public Builder setChrootPath(String chrootPath) {
+ public Builder<T> setChrootPath(String chrootPath) {
_chrootPath = chrootPath;
return this;
}
- public Builder setWtCachePaths(List<String> wtCachePaths) {
+ public Builder<T> setWtCachePaths(List<String> wtCachePaths) {
_wtCachePaths = wtCachePaths;
return this;
}
- public Builder setZkCachePaths(List<String> zkCachePaths) {
+ public Builder<T> setZkCachePaths(List<String> zkCachePaths) {
_zkCachePaths = zkCachePaths;
return this;
}
@@ -977,14 +978,14 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
* @param zkClientType
* @return
*/
- public Builder setZkClientType(ZkBaseDataAccessor.ZkClientType zkClientType) {
+ public Builder<T> setZkClientType(ZkBaseDataAccessor.ZkClientType zkClientType) {
_zkClientType = zkClientType;
return this;
}
- public ZkCacheBaseDataAccessor build() {
+ public ZkCacheBaseDataAccessor<T> build() {
validate();
- return new ZkCacheBaseDataAccessor(this);
+ return new ZkCacheBaseDataAccessor<>(this);
}
private void validate() {
@@ -997,8 +998,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
throw new HelixException(
"ZkCacheBaseDataAccessor: you cannot set ZkClientType on multi-realm mode!");
}
- // If ZkClientType is not set, default to SHARED
- if (!isZkClientTypeSet) {
+ // If ZkClientType is not set and realmMode is single-realm, default to SHARED
+ if (!isZkClientTypeSet && _realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM) {
_zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED;
}
@@ -1025,7 +1026,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
// Resolve RealmAwareZkClientConfig
if (_realmAwareZkClientConfig == null) {
- _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+ _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig()
+ .setZkSerializer(new ZNRecordSerializer());
}
// Resolve RealmAwareZkConnectionConfig
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 0b8fa17..987cc44 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -38,17 +38,17 @@ import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.SystemPropertyKeys;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,10 +97,12 @@ public class TaskDriver {
manager.getHelixPropertyStore(), manager.getClusterName());
}
+ @Deprecated
public TaskDriver(HelixZkClient client, String clusterName) {
this(client, new ZkBaseDataAccessor<>(client), clusterName);
}
+ @Deprecated
public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor,
String clusterName) {
this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 7f57fa9..6d601c4 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -135,8 +135,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
}
public BestPossibleExternalViewVerifier build() {
- if (_clusterName == null || (_zkAddress == null && _zkClient == null)) {
- throw new IllegalArgumentException("Cluster name or zookeeper info is missing!");
+ if (_clusterName == null) {
+ throw new IllegalArgumentException("Cluster name is missing!");
}
if (_zkClient != null) {
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index 9c4a587..76f0d09 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -103,8 +103,8 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
private boolean _isDeactivatedNodeAware = false;
public StrictMatchExternalViewVerifier build() {
- if (_clusterName == null || (_zkAddress == null && _zkClient == null)) {
- throw new IllegalArgumentException("Cluster name or zookeeper info is missing!");
+ if (_clusterName == null) {
+ throw new IllegalArgumentException("Cluster name is missing!");
}
if (_zkClient != null) {
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 9cac992..adb4812 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -47,6 +47,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
@@ -295,12 +296,12 @@ public class TestHelper {
zkClient.close();
}
- public static void dropCluster(String clusterName, HelixZkClient zkClient) throws Exception {
+ public static void dropCluster(String clusterName, RealmAwareZkClient zkClient) {
ClusterSetup setupTool = new ClusterSetup(zkClient);
dropCluster(clusterName, zkClient, setupTool);
}
- public static void dropCluster(String clusterName, HelixZkClient zkClient, ClusterSetup setup) {
+ public static void dropCluster(String clusterName, RealmAwareZkClient zkClient, ClusterSetup setup) {
String namespace = "/" + clusterName;
if (zkClient.exists(namespace)) {
try {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
new file mode 100644
index 0000000..be8bb0b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -0,0 +1,476 @@
+package org.apache.helix.integration.multizk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.InstanceType;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.config.RebalanceConfig;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * TestMultiZkHelixJavaApis spins up multiple in-memory ZooKeepers with a pre-configured
+ * cluster-Zk realm routing information.
+ * This test verifies that all Helix Java APIs work as expected.
+ */
+public class TestMultiZkHelixJavaApis {
+ private static final int NUM_ZK = 3;
+ private static final Map<String, ZkServer> ZK_SERVER_MAP = new HashMap<>();
+ private static final Map<String, HelixZkClient> ZK_CLIENT_MAP = new HashMap<>();
+ private static final Map<String, ClusterControllerManager> MOCK_CONTROLLERS = new HashMap<>();
+ private static final Set<MockParticipantManager> MOCK_PARTICIPANTS = new HashSet<>();
+ private static final List<String> CLUSTER_LIST =
+ ImmutableList.of("CLUSTER_1", "CLUSTER_2", "CLUSTER_3");
+
+ private MockMetadataStoreDirectoryServer _msds;
+ private static final Map<String, Collection<String>> _rawRoutingData = new HashMap<>();
+ private RealmAwareZkClient _zkClient;
+ private HelixAdmin _zkHelixAdmin;
+
+ // Save System property configs from before this test and pass onto after the test
+ private Map<String, String> _configStore = new HashMap<>();
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ // Create 3 in-memory zookeepers and routing mapping
+ final String zkPrefix = "localhost:";
+ final int zkStartPort = 8777;
+
+ for (int i = 0; i < NUM_ZK; i++) {
+ String zkAddress = zkPrefix + (zkStartPort + i);
+ ZK_SERVER_MAP.put(zkAddress, TestHelper.startZkServer(zkAddress));
+ ZK_CLIENT_MAP.put(zkAddress, DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())));
+
+ // One cluster per ZkServer created
+ _rawRoutingData.put(zkAddress, Collections.singletonList("/" + CLUSTER_LIST.get(i)));
+ }
+
+ // Create a Mock MSDS
+ final String msdsHostName = "localhost";
+ final int msdsPort = 11117;
+ final String msdsNamespace = "multiZkTest";
+ _msds = new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, msdsNamespace,
+ _rawRoutingData);
+ _msds.startServer();
+
+ // Save previously-set system configs
+ String prevMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+ String prevMsdsServerEndpoint =
+ System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+ if (prevMultiZkEnabled != null) {
+ _configStore.put(SystemPropertyKeys.MULTI_ZK_ENABLED, prevMultiZkEnabled);
+ }
+ if (prevMsdsServerEndpoint != null) {
+ _configStore
+ .put(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, prevMsdsServerEndpoint);
+ }
+
+ // Turn on multiZk mode in System config
+ System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
+ // MSDS endpoint: http://localhost:11117/admin/v2/namespaces/multiZkTest
+ System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
+ "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace);
+
+ // Create a FederatedZkClient for admin work
+ _zkClient =
+ new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
+ new RealmAwareZkClient.RealmAwareZkClientConfig());
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ try {
+ // Kill all mock controllers and participants
+ MOCK_CONTROLLERS.values().forEach(ClusterControllerManager::syncStop);
+ MOCK_PARTICIPANTS.forEach(MockParticipantManager::syncStop);
+
+ // Tear down all clusters
+ CLUSTER_LIST.forEach(cluster -> TestHelper.dropCluster(cluster, _zkClient));
+
+ // Verify that all clusters are gone in each zookeeper
+ Assert.assertTrue(TestHelper.verify(() -> {
+ for (Map.Entry<String, HelixZkClient> zkClientEntry : ZK_CLIENT_MAP.entrySet()) {
+ List<String> children = zkClientEntry.getValue().getChildren("/");
+ if (children.stream().anyMatch(CLUSTER_LIST::contains)) {
+ return false;
+ }
+ }
+ return true;
+ }, TestHelper.WAIT_DURATION));
+
+ // Tear down zookeepers
+ ZK_SERVER_MAP.forEach((zkAddress, zkServer) -> zkServer.shutdown());
+
+ // Stop MockMSDS
+ _msds.stopServer();
+ } finally {
+ // Restore System property configs
+ if (_configStore.containsKey(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED,
+ _configStore.get(SystemPropertyKeys.MULTI_ZK_ENABLED));
+ } else {
+ System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+ }
+ if (_configStore.containsKey(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY)) {
+ System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
+ _configStore.get(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY));
+ } else {
+ System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+ }
+ }
+ }
+
+ /**
+ * Test cluster creation according to the pre-set routing mapping.
+ * Helix Java API tested is ClusterSetup in this method.
+ */
+ @Test
+ public void testCreateClusters() {
+ // Create two ClusterSetups using two different constructors
+ // Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
+ ClusterSetup clusterSetupZkAddr = new ClusterSetup(ZK_SERVER_MAP.keySet().iterator().next());
+ ClusterSetup clusterSetupBuilder = new ClusterSetup.Builder().build();
+
+ createClusters(clusterSetupZkAddr);
+ verifyClusterCreation(clusterSetupZkAddr);
+
+ createClusters(clusterSetupBuilder);
+ verifyClusterCreation(clusterSetupBuilder);
+
+ // Create clusters again to continue with testing
+ createClusters(clusterSetupBuilder);
+ }
+
+ private void createClusters(ClusterSetup clusterSetup) {
+ // Create clusters
+ for (String clusterName : CLUSTER_LIST) {
+ clusterSetup.addCluster(clusterName, false);
+ }
+ }
+
+ private void verifyClusterCreation(ClusterSetup clusterSetup) {
+ // Verify that clusters have been created correctly according to routing mapping
+ _rawRoutingData.forEach((zkAddress, cluster) -> {
+ // Note: clusterNamePath already contains "/"
+ String clusterNamePath = cluster.iterator().next();
+
+ // Check with single-realm ZkClients
+ Assert.assertTrue(ZK_CLIENT_MAP.get(zkAddress).exists(clusterNamePath));
+ // Check with realm-aware ZkClient (federated)
+ Assert.assertTrue(_zkClient.exists(clusterNamePath));
+
+ // Remove clusters
+ clusterSetup
+ .deleteCluster(clusterNamePath.substring(1)); // Need to remove "/" at the beginning
+ });
+ }
+
+ /**
+ * Test Helix Participant creation and addition.
+ * Helix Java APIs tested in this method are:
+ * ZkHelixAdmin and ZKHelixManager (mock participant/controller)
+ */
+ @Test(dependsOnMethods = "testCreateClusters")
+ public void testCreateParticipants() throws Exception {
+ // Create two ClusterSetups using two different constructors
+ // Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
+ HelixAdmin helixAdminZkAddr = new ZKHelixAdmin(ZK_SERVER_MAP.keySet().iterator().next());
+ HelixAdmin helixAdminBuilder = new ZKHelixAdmin.Builder().build();
+ _zkHelixAdmin = helixAdminBuilder;
+
+ String participantNamePrefix = "Node_";
+ int numParticipants = 5;
+ createParticipantsAndVerify(helixAdminZkAddr, numParticipants, participantNamePrefix);
+ createParticipantsAndVerify(helixAdminBuilder, numParticipants, participantNamePrefix);
+
+ // Create mock controller and participants for next tests
+ for (String cluster : CLUSTER_LIST) {
+ // Start a controller
+ // Note: in multiZK mode, ZK Addr is ignored
+ ClusterControllerManager mockController =
+ new ClusterControllerManager("DummyZK", cluster, "controller");
+ mockController.syncStart();
+ MOCK_CONTROLLERS.put(cluster, mockController);
+
+ for (int i = 0; i < numParticipants; i++) {
+ // Note: in multiZK mode, ZK Addr is ignored
+ InstanceConfig instanceConfig = new InstanceConfig(participantNamePrefix + i);
+ helixAdminBuilder.addInstance(cluster, instanceConfig);
+ MockParticipantManager mockNode =
+ new MockParticipantManager("DummyZK", cluster, participantNamePrefix + i);
+
+ // Register task state model for task framework testing in later methods
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+ taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine = mockNode.getStateMachineEngine();
+ stateMachine
+ .registerStateModelFactory("Task", new TaskStateModelFactory(mockNode, taskFactoryReg));
+
+ mockNode.syncStart();
+ MOCK_PARTICIPANTS.add(mockNode);
+ }
+ // Check that mockNodes are up
+ Assert.assertTrue(TestHelper
+ .verify(() -> helixAdminBuilder.getInstancesInCluster(cluster).size() == numParticipants,
+ TestHelper.WAIT_DURATION));
+ }
+ }
+
+ private void createParticipantsAndVerify(HelixAdmin admin, int numParticipants,
+ String participantNamePrefix) {
+ // Create participants in clusters
+ Set<String> participantNames = new HashSet<>();
+ CLUSTER_LIST.forEach(cluster -> {
+ for (int i = 0; i < numParticipants; i++) {
+ String participantName = participantNamePrefix + i;
+ participantNames.add(participantName);
+ InstanceConfig instanceConfig = new InstanceConfig(participantNamePrefix + i);
+ admin.addInstance(cluster, instanceConfig);
+ }
+ });
+
+ // Verify participants have been created properly
+ _rawRoutingData.forEach((zkAddress, cluster) -> {
+ // Note: clusterNamePath already contains "/"
+ String clusterNamePath = cluster.iterator().next();
+
+ // Check with single-realm ZkClients
+ List<String> instances =
+ ZK_CLIENT_MAP.get(zkAddress).getChildren(clusterNamePath + "/INSTANCES");
+ Assert.assertEquals(new HashSet<>(instances), participantNames);
+
+ // Check with realm-aware ZkClient (federated)
+ instances = _zkClient.getChildren(clusterNamePath + "/INSTANCES");
+ Assert.assertEquals(new HashSet<>(instances), participantNames);
+
+ // Remove Participants
+ participantNames.forEach(participant -> {
+ InstanceConfig instanceConfig = new InstanceConfig(participant);
+ admin.dropInstance(clusterNamePath.substring(1), instanceConfig);
+ });
+ });
+ }
+
+ /**
+ * Test that clusters and instances are set up properly.
+ * Helix Java APIs tested in this method is ZkUtil.
+ */
+ @Test(dependsOnMethods = "testCreateParticipants")
+ public void testZkUtil() {
+ CLUSTER_LIST.forEach(cluster -> {
+ _zkHelixAdmin.getInstancesInCluster(cluster).forEach(instance -> ZKUtil
+ .isInstanceSetup("DummyZk", cluster, instance, InstanceType.PARTICIPANT));
+ });
+ }
+
+ /**
+ * Create resources and see if things get rebalanced correctly.
+ * Helix Java API tested in this methods are:
+ * ZkBaseDataAccessor
+ * ZkHelixClusterVerifier (BestPossible)
+ */
+ @Test(dependsOnMethods = "testZkUtil")
+ public void testCreateAndRebalanceResources() {
+ BaseDataAccessor<ZNRecord> dataAccessorZkAddr = new ZkBaseDataAccessor<>("DummyZk");
+ BaseDataAccessor<ZNRecord> dataAccessorBuilder =
+ new ZkBaseDataAccessor.Builder<ZNRecord>().build();
+
+ String resourceNamePrefix = "DB_";
+ int numResources = 5;
+ int numPartitions = 3;
+ Map<String, Map<String, ZNRecord>> idealStateMap = new HashMap<>();
+
+ for (String cluster : CLUSTER_LIST) {
+ Set<String> resourceNames = new HashSet<>();
+ Set<String> liveInstancesNames = new HashSet<>(dataAccessorZkAddr
+ .getChildNames("/" + cluster + "/LIVEINSTANCES", AccessOption.PERSISTENT));
+
+ for (int i = 0; i < numResources; i++) {
+ String resource = cluster + "_" + resourceNamePrefix + i;
+ _zkHelixAdmin.addResource(cluster, resource, numPartitions, "MasterSlave",
+ IdealState.RebalanceMode.FULL_AUTO.name());
+ _zkHelixAdmin.rebalance(cluster, resource, 3);
+ resourceNames.add(resource);
+
+ // Update IdealState fields with ZkBaseDataAccessor
+ String resourcePath = "/" + cluster + "/IDEALSTATES/" + resource;
+ ZNRecord is = dataAccessorZkAddr.get(resourcePath, null, AccessOption.PERSISTENT);
+ is.setSimpleField(RebalanceConfig.RebalanceConfigProperty.REBALANCER_CLASS_NAME.name(),
+ DelayedAutoRebalancer.class.getName());
+ is.setSimpleField(RebalanceConfig.RebalanceConfigProperty.REBALANCE_STRATEGY.name(),
+ CrushEdRebalanceStrategy.class.getName());
+ dataAccessorZkAddr.set(resourcePath, is, AccessOption.PERSISTENT);
+ idealStateMap.computeIfAbsent(cluster, recordList -> new HashMap<>())
+ .putIfAbsent(is.getId(), is); // Save ZNRecord for comparison later
+ }
+
+ // Create a verifier to make sure all resources have been rebalanced
+ ZkHelixClusterVerifier verifier =
+ new BestPossibleExternalViewVerifier.Builder(cluster).setResources(resourceNames)
+ .setExpectLiveInstances(liveInstancesNames).build();
+ Assert.assertTrue(verifier.verifyByPolling());
+ }
+
+ // Using the ZkBaseDataAccessor created using the Builder, check that the correct IS is read
+ for (String cluster : CLUSTER_LIST) {
+ Map<String, ZNRecord> savedIdealStates = idealStateMap.get(cluster);
+ List<String> resources = dataAccessorBuilder
+ .getChildNames("/" + cluster + "/IDEALSTATES", AccessOption.PERSISTENT);
+ resources.forEach(resource -> {
+ ZNRecord is = dataAccessorBuilder
+ .get("/" + cluster + "/IDEALSTATES/" + resource, null, AccessOption.PERSISTENT);
+ Assert
+ .assertEquals(is.getSimpleFields(), savedIdealStates.get(is.getId()).getSimpleFields());
+ });
+ }
+ }
+
+ /**
+ * This method tests ConfigAccessor.
+ */
+ @Test(dependsOnMethods = "testCreateAndRebalanceResources")
+ public void testConfigAccessor() {
+ // Build two ConfigAccessors to read and write:
+ // 1. ConfigAccessor using a deprecated constructor
+ // 2. ConfigAccessor using the Builder
+ ConfigAccessor configAccessorZkAddr = new ConfigAccessor("DummyZk");
+ ConfigAccessor configAccessorBuilder = new ConfigAccessor.Builder().build();
+
+ setClusterConfigAndVerify(configAccessorZkAddr);
+ setClusterConfigAndVerify(configAccessorBuilder);
+ }
+
+ private void setClusterConfigAndVerify(ConfigAccessor configAccessorMultiZk) {
+ _rawRoutingData.forEach((zkAddr, clusterNamePathList) -> {
+ // Need to rid of "/" because this is a sharding key
+ String cluster = clusterNamePathList.iterator().next().substring(1);
+ ClusterConfig clusterConfig = new ClusterConfig(cluster);
+ clusterConfig.getRecord().setSimpleField("configAccessor", cluster);
+ configAccessorMultiZk.setClusterConfig(cluster, clusterConfig);
+
+ // Now check with a single-realm ConfigAccessor
+ ConfigAccessor configAccessorSingleZk =
+ new ConfigAccessor.Builder().setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+ .setZkAddress(zkAddr).build();
+ Assert.assertEquals(configAccessorSingleZk.getClusterConfig(cluster), clusterConfig);
+
+ // Also check with a single-realm dedicated ZkClient
+ ZNRecord clusterConfigRecord =
+ ZK_CLIENT_MAP.get(zkAddr).readData("/" + cluster + "/CONFIGS/CLUSTER/" + cluster);
+ Assert.assertEquals(clusterConfigRecord, clusterConfig.getRecord());
+
+ // Clean up
+ clusterConfig = new ClusterConfig(cluster);
+ configAccessorMultiZk.setClusterConfig(cluster, clusterConfig);
+ });
+ }
+
+ /**
+ * This test submits multiple tasks to be run.
+ * The Helix Java APIs tested in this method are TaskDriver (HelixManager) and
+ * ZkHelixPropertyStore/ZkCacheBaseDataAccessor.
+ */
+ @Test(dependsOnMethods = "testConfigAccessor")
+ public void testTaskFramework() throws InterruptedException {
+ // Note: TaskDriver is like HelixManager - it only operates on one designated
+ // Create TaskDrivers for all clusters
+ Map<String, TaskDriver> taskDriverMap = new HashMap<>();
+ MOCK_CONTROLLERS
+ .forEach((cluster, controller) -> taskDriverMap.put(cluster, new TaskDriver(controller)));
+
+ // Create a Task Framework workload and start
+ Workflow workflow = WorkflowGenerator.generateNonTargetedSingleWorkflowBuilder("job").build();
+ for (TaskDriver taskDriver : taskDriverMap.values()) {
+ taskDriver.start(workflow);
+ }
+
+ // Use multi-ZK ZkHelixPropertyStore/ZkCacheBaseDataAccessor to query for workflow/job states
+ HelixPropertyStore<ZNRecord> propertyStore =
+ new ZkHelixPropertyStore.Builder<ZNRecord>().build();
+ for (Map.Entry<String, TaskDriver> entry : taskDriverMap.entrySet()) {
+ String cluster = entry.getKey();
+ TaskDriver driver = entry.getValue();
+ // Wait until workflow has completed
+ TaskState wfStateFromTaskDriver =
+ driver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED);
+ String workflowContextPath =
+ "/" + cluster + "/PROPERTYSTORE/TaskRebalancer/" + workflow.getName() + "/Context";
+ ZNRecord workflowContextRecord =
+ propertyStore.get(workflowContextPath, null, AccessOption.PERSISTENT);
+ WorkflowContext context = new WorkflowContext(workflowContextRecord);
+
+ // Compare the workflow state read from PropertyStore and TaskDriver
+ Assert.assertEquals(context.getWorkflowState(), wfStateFromTaskDriver);
+ }
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 40e2dcf..c582ab3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -19,11 +19,14 @@ package org.apache.helix.integration.task;
* under the License.
*/
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.Workflow;
/**
@@ -35,12 +38,14 @@ public class WorkflowGenerator {
public static final String JOB_NAME_2 = "SomeJob2";
public static final Map<String, String> DEFAULT_JOB_CONFIG;
+ public static final Map<String, String> DEFAULT_JOB_CONFIG_NOT_TARGETED;
static {
Map<String, String> tmpMap = new TreeMap<String, String>();
- tmpMap.put("TargetResource", DEFAULT_TGT_DB);
- tmpMap.put("TargetPartitionStates", "MASTER");
tmpMap.put("Command", MockTask.TASK_COMMAND);
tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000));
+ DEFAULT_JOB_CONFIG_NOT_TARGETED = Collections.unmodifiableMap(new TreeMap<>(tmpMap));
+ tmpMap.put("TargetResource", DEFAULT_TGT_DB);
+ tmpMap.put("TargetPartitionStates", "MASTER");
DEFAULT_JOB_CONFIG = Collections.unmodifiableMap(tmpMap);
}
@@ -57,6 +62,24 @@ public class WorkflowGenerator {
return generateSingleJobWorkflowBuilder(jobName, jobBuilder);
}
+ public static Workflow.Builder generateNonTargetedSingleWorkflowBuilder(String jobName) {
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG_NOT_TARGETED);
+ jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+
+ // Create 5 TaskConfigs
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+ taskConfigBuilder.setTaskId("task_" + i);
+ taskConfigBuilder.addConfig("Timeout", String.valueOf(2000));
+ taskConfigBuilder.setCommand(MockTask.TASK_COMMAND);
+ taskConfigs.add(taskConfigBuilder.build());
+ }
+
+ jobBuilder.addTaskConfigs(taskConfigs);
+ return generateSingleJobWorkflowBuilder(jobName, jobBuilder);
+ }
+
public static Workflow.Builder generateSingleJobWorkflowBuilder(String jobName,
JobConfig.Builder jobBuilder) {
return new Workflow.Builder(jobName).addJobConfig(jobName, jobBuilder);