You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:24 UTC
[helix] 21/37: Implement AssignmentMetadataStore (#453)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 3d2da32bddc7798a502ae170873f0fb1abe8cbd5
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Mon Sep 9 16:40:24 2019 -0700
Implement AssignmentMetadataStore (#453)
Implement AssignmentMetadataStore
AssignmentMetadataStore is a component for the new WAGED Rebalaner. It provides APIs that allows the rebalancer to read and write the baseline and best possible assignments using BucketDataAccessor.
Changelist:
1. Add AssignmentMetadataStore
2. Add an integration test: TestAssignmentMetadataStore
---
.../rebalancer/waged/AssignmentMetadataStore.java | 112 +++++++++++++++++++--
.../rebalancer/waged/WagedRebalancer.java | 64 ++++++------
.../manager/zk/ZNRecordJacksonSerializer.java | 4 +-
.../waged/MockAssignmentMetadataStore.java | 9 +-
.../waged/TestAssignmentMetadataStore.java | 101 +++++++++++++++++++
5 files changed, 244 insertions(+), 46 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index cc52dac..bf9f292 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -19,6 +19,16 @@ package org.apache.helix.controller.rebalancer.waged;
* under the License.
*/
+import java.io.IOException;
+import java.util.Arrays;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.BucketDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.ResourceAssignment;
import java.util.HashMap;
@@ -28,24 +38,106 @@ import java.util.Map;
* A placeholder before we have the real assignment metadata store.
*/
public class AssignmentMetadataStore {
- private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
- private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
+ private static final String ASSIGNMENT_METADATA_KEY = "ASSIGNMENT_METADATA";
+ private static final String BASELINE_TEMPLATE = "/%s/%s/BASELINE";
+ private static final String BEST_POSSIBLE_TEMPLATE = "/%s/%s/BEST_POSSIBLE";
+ private static final String BASELINE_KEY = "BASELINE";
+ private static final String BEST_POSSIBLE_KEY = "BEST_POSSIBLE";
+ private static final ZkSerializer SERIALIZER = new ZNRecordJacksonSerializer();
+
+ private BucketDataAccessor _dataAccessor;
+ private String _baselinePath;
+ private String _bestPossiblePath;
+ private Map<String, ResourceAssignment> _globalBaseline;
+ private Map<String, ResourceAssignment> _bestPossibleAssignment;
+
+ AssignmentMetadataStore(HelixManager helixManager) {
+ _dataAccessor = new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString());
+ _baselinePath =
+ String.format(BASELINE_TEMPLATE, helixManager.getClusterName(), ASSIGNMENT_METADATA_KEY);
+ _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, helixManager.getClusterName(),
+ ASSIGNMENT_METADATA_KEY);
+ }
public Map<String, ResourceAssignment> getBaseline() {
- return _persistGlobalBaseline;
+ // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
+ if (_globalBaseline == null) {
+ HelixProperty baseline =
+ _dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class);
+ _globalBaseline = splitAssignments(baseline);
+ }
+ return _globalBaseline;
+ }
+
+ public Map<String, ResourceAssignment> getBestPossibleAssignment() {
+ // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
+ if (_bestPossibleAssignment == null) {
+ HelixProperty baseline =
+ _dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class);
+ _bestPossibleAssignment = splitAssignments(baseline);
+ }
+ return _bestPossibleAssignment;
}
public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
- // TODO clean up invalid items
- _persistGlobalBaseline = globalBaseline;
+ // TODO: Make the write async?
+ // Persist to ZK
+ HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline);
+ try {
+ _dataAccessor.compressedBucketWrite(_baselinePath, combinedAssignments);
+ } catch (IOException e) {
+ // TODO: Improve failure handling
+ throw new HelixException("Failed to persist baseline!", e);
+ }
+
+ // Update the in-memory reference
+ _globalBaseline = globalBaseline;
}
- public Map<String, ResourceAssignment> getBestPossibleAssignment() {
- return _persistBestPossibleAssignment;
+ public void persistBestPossibleAssignment(
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
+ // TODO: Make the write async?
+ // Persist to ZK asynchronously
+ HelixProperty combinedAssignments =
+ combineAssignments(BEST_POSSIBLE_KEY, bestPossibleAssignment);
+ try {
+ _dataAccessor.compressedBucketWrite(_bestPossiblePath, combinedAssignments);
+ } catch (IOException e) {
+ // TODO: Improve failure handling
+ throw new HelixException("Failed to persist baseline!", e);
+ }
+
+ // Update the in-memory reference
+ _bestPossibleAssignment = bestPossibleAssignment;
+ }
+
+ /**
+ * Produces one HelixProperty that contains all assignment data.
+ * @param name
+ * @param assignmentMap
+ * @return
+ */
+ private HelixProperty combineAssignments(String name,
+ Map<String, ResourceAssignment> assignmentMap) {
+ HelixProperty property = new HelixProperty(name);
+ // Add each resource's assignment as a simple field in one ZNRecord
+ assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource,
+ Arrays.toString(SERIALIZER.serialize(assignment.getRecord()))));
+ return property;
}
- public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
- // TODO clean up invalid items
- _persistBestPossibleAssignment.putAll(bestPossibleAssignment);
+ /**
+ * Returns a Map of (ResourceName, ResourceAssignment) pairs.
+ * @param property
+ * @return
+ */
+ private Map<String, ResourceAssignment> splitAssignments(HelixProperty property) {
+ Map<String, ResourceAssignment> assignmentMap = new HashMap<>();
+ // Convert each resource's assignment String into a ResourceAssignment object and put it in a
+ // map
+ property.getRecord().getSimpleFields()
+ .forEach((resource, assignment) -> assignmentMap.put(resource,
+ new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignment.getBytes()))));
+ return assignmentMap;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 866c7c9..22cac7e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -51,10 +51,10 @@ import org.slf4j.LoggerFactory;
/**
* Weight-Aware Globally-Even Distribute Rebalancer.
- *
- * @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
- * Design Document
- * </a>
+ * @see <a
+ * href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
+ * Design Document
+ * </a>
*/
public class WagedRebalancer {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
@@ -62,8 +62,8 @@ public class WagedRebalancer {
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
- Collections.unmodifiableSet(new HashSet<>(Arrays
- .asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
+ Collections
+ .unmodifiableSet(new HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.CLUSTER_CONFIG,
HelixConstants.ChangeType.INSTANCE_CONFIG)));
// The cluster change detector is a stateful object.
@@ -73,7 +73,8 @@ public class WagedRebalancer {
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
// --------- The following fields are placeholders and need replacement. -----------//
- // TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
+ // TODO Shall we make the metadata store a static threadlocal object as well to avoid
+ // reinitialization?
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
// ------------------------------------------------------------------------------------//
@@ -81,8 +82,8 @@ public class WagedRebalancer {
public WagedRebalancer(HelixManager helixManager) {
this(
// TODO init the metadata store according to their requirement when integrate,
- // or change to final static method if possible.
- new AssignmentMetadataStore(),
+ // or change to final static method if possible.
+ new AssignmentMetadataStore(helixManager),
// TODO parse the cluster setting
ConstraintBasedAlgorithmFactory.getInstance(),
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
@@ -103,14 +104,14 @@ public class WagedRebalancer {
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
+
}
/**
* Compute the new IdealStates for all the input resources. The IdealStates include both new
* partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
- *
- * @param clusterData The Cluster status data provider.
- * @param resourceMap A map containing all the rebalancing resources.
+ * @param clusterData The Cluster status data provider.
+ * @param resourceMap A map containing all the rebalancing resources.
* @param currentStateOutput The present Current States of the resources.
* @return A map of the new IdealStates with the resource name as key.
*/
@@ -124,8 +125,8 @@ public class WagedRebalancer {
IdealState is = clusterData.getIdealState(resourceEntry.getKey());
return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
&& getClass().getName().equals(is.getRebalancerClassName());
- }).collect(Collectors
- .toMap(resourceEntry -> resourceEntry.getKey(), resourceEntry -> resourceEntry.getValue()));
+ }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
+ resourceEntry -> resourceEntry.getValue()));
if (resourceMap.isEmpty()) {
LOG.warn("There is no valid resource to be rebalanced by {}",
@@ -140,13 +141,13 @@ public class WagedRebalancer {
Map<String, IdealState> newIdealStates = computeBestPossibleStates(clusterData, resourceMap);
// Construct the new best possible states according to the current state and target assignment.
- // Note that the new ideal state might be an intermediate state between the current state and the target assignment.
+ // Note that the new ideal state might be an intermediate state between the current state and
+ // the target assignment.
for (IdealState is : newIdealStates.values()) {
String resourceName = is.getResourceName();
// Adjust the states according to the current state.
- ResourceAssignment finalAssignment = _mappingCalculator
- .computeBestPossiblePartitionState(clusterData, is, resourceMap.get(resourceName),
- currentStateOutput);
+ ResourceAssignment finalAssignment = _mappingCalculator.computeBestPossiblePartitionState(
+ clusterData, is, resourceMap.get(resourceName), currentStateOutput);
// Clean up the state mapping fields. Use the final assignment that is calculated by the
// mapping calculator to replace them.
@@ -195,10 +196,10 @@ public class WagedRebalancer {
IdealState newIdeaState;
try {
IdealState currentIdealState = clusterData.getIdealState(resourceName);
- Map<String, Integer> statePriorityMap =
- clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
- .getStatePriorityMap();
- // Create a new IdealState instance contains the new calculated assignment in the preference list.
+ Map<String, Integer> statePriorityMap = clusterData
+ .getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+ // Create a new IdealState instance contains the new calculated assignment in the preference
+ // list.
newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState,
newAssignment.get(resourceName), statePriorityMap);
} catch (Exception ex) {
@@ -227,9 +228,8 @@ public class WagedRebalancer {
throw new HelixRebalanceException("Failed to get the current baseline assignment.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
- Map<String, ResourceAssignment> baseline =
- calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(),
- Collections.emptyMap(), currentBaseline);
+ Map<String, ResourceAssignment> baseline = calculateAssignment(clusterData, clusterChanges,
+ resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline);
try {
_assignmentMetadataStore.persistBaseline(baseline);
} catch (Exception ex) {
@@ -254,9 +254,8 @@ public class WagedRebalancer {
throw new HelixRebalanceException("Failed to get the persisted assignment records.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
- Map<String, ResourceAssignment> newAssignment =
- calculateAssignment(clusterData, clusterChanges, resourceMap, activeInstances, baseline,
- prevBestPossibleAssignment);
+ Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges,
+ resourceMap, activeInstances, baseline, prevBestPossibleAssignment);
try {
// TODO Test to confirm if persisting the final assignment (with final partition states)
// would be a better option.
@@ -271,13 +270,13 @@ public class WagedRebalancer {
/**
* Generate the cluster model based on the input and calculate the optimal assignment.
- *
* @param clusterData the cluster data cache.
* @param clusterChanges the detected cluster changes.
* @param resourceMap the rebalancing resources.
* @param activeNodes the alive and enabled nodes.
* @param baseline the baseline assignment for the algorithm as a reference.
- * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a reference.
+ * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a
+ * reference.
* @return the new optimal assignment for the resources.
*/
private Map<String, ResourceAssignment> calculateAssignment(
@@ -289,9 +288,8 @@ public class WagedRebalancer {
LOG.info("Start calculating for an assignment");
ClusterModel clusterModel;
try {
- clusterModel = ClusterModelProvider
- .generateClusterModel(clusterData, resourceMap, activeNodes, clusterChanges, baseline,
- prevBestPossibleAssignment);
+ clusterModel = ClusterModelProvider.generateClusterModel(clusterData, resourceMap,
+ activeNodes, clusterChanges, baseline, prevBestPossibleAssignment);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
index 989017a..b375e80 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
@@ -27,8 +27,8 @@ import org.apache.helix.ZNRecord;
import org.codehaus.jackson.map.ObjectMapper;
/**
- * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using MessagePack's
- * serializer. Note that this serializer doesn't check for the size of the resulting binary.
+ * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using Jackson. Note that
+ * this serializer doesn't check for the size of the resulting binary.
*/
public class ZNRecordJacksonSerializer implements ZkSerializer {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index ea8c164..8b80f2d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged;
* under the License.
*/
+import org.apache.helix.HelixManager;
import org.apache.helix.model.ResourceAssignment;
import java.util.HashMap;
@@ -32,6 +33,11 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
+ public MockAssignmentMetadataStore() {
+ // In-memory mock component, so pass null for HelixManager since it's not needed
+ super(null);
+ }
+
public Map<String, ResourceAssignment> getBaseline() {
return _persistGlobalBaseline;
}
@@ -44,7 +50,8 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
return _persistBestPossibleAssignment;
}
- public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
+ public void persistBestPossibleAssignment(
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
_persistBestPossibleAssignment = bestPossibleAssignment;
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
new file mode 100644
index 0000000..922915f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -0,0 +1,101 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ResourceAssignment;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestAssignmentMetadataStore extends ZkTestBase {
+ protected static final int NODE_NR = 5;
+ protected static final int START_PORT = 12918;
+ protected static final String STATE_MODEL = "MasterSlave";
+ protected static final String TEST_DB = "TestDB";
+ protected static final int _PARTITIONS = 20;
+
+ protected HelixManager _manager;
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+ protected MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
+ protected ClusterControllerManager _controller;
+ protected int _replica = 3;
+
+ private AssignmentMetadataStore _store;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+
+ // setup storage cluster
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
+ for (int i = 0; i < NODE_NR; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
+
+ // start dummy participants
+ for (int i = 0; i < NODE_NR; i++) {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants[i].syncStart();
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // create cluster manager
+ _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+ InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+
+ // create AssignmentMetadataStore
+ _store = new AssignmentMetadataStore(_manager);
+ }
+
+ /**
+ * TODO: Reading baseline will be empty because AssignmentMetadataStore isn't being used yet by
+ * the new rebalancer. Modify this integration test once the WAGED rebalancer
+ * starts using AssignmentMetadataStore's persist APIs.
+ * TODO: WAGED Rebalancer currently does NOT work with ZKClusterVerifier because verifier's
+ * HelixManager is null, and that causes an NPE when instantiating AssignmentMetadataStore.
+ */
+ @Test
+ public void testReadEmptyBaseline() {
+ try {
+ Map<String, ResourceAssignment> baseline = _store.getBaseline();
+ Assert.fail("Should fail because there shouldn't be any data.");
+ } catch (Exception e) {
+ // OK
+ }
+ }
+}