You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:25 UTC
[helix] 22/37: Fix TestWagedRebalancer and add constructor in
AssignmentMetadataStore
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git
commit c719273daf4236e2a803bb0d99724e5a843f5512
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Tue Sep 10 09:42:23 2019 -0700
Fix TestWagedRebalancer and add constructor in AssignmentMetadataStore
TestWagedRebalancer was failing because it was not using a proper HelixManager to instantiate a mock version of AssignmentMetadataStore. This diff refactors the constructors in AssignmentMetadataStore and fixes the failing test.
---
.../rebalancer/waged/AssignmentMetadataStore.java | 18 +++--
.../waged/MockAssignmentMetadataStore.java | 10 +--
.../rebalancer/waged/TestWagedRebalancer.java | 93 +++++++++++++---------
3 files changed, 71 insertions(+), 50 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index bf9f292..fd655d1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -21,6 +21,8 @@ package org.apache.helix.controller.rebalancer.waged;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
@@ -31,9 +33,6 @@ import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.ResourceAssignment;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* A placeholder before we have the real assignment metadata store.
*/
@@ -51,12 +50,15 @@ public class AssignmentMetadataStore {
private Map<String, ResourceAssignment> _globalBaseline;
private Map<String, ResourceAssignment> _bestPossibleAssignment;
+ AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
+ _dataAccessor = bucketDataAccessor;
+ _baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
+ _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
+ }
+
AssignmentMetadataStore(HelixManager helixManager) {
- _dataAccessor = new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString());
- _baselinePath =
- String.format(BASELINE_TEMPLATE, helixManager.getClusterName(), ASSIGNMENT_METADATA_KEY);
- _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, helixManager.getClusterName(),
- ASSIGNMENT_METADATA_KEY);
+ this(new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()),
+ helixManager.getClusterName());
}
public Map<String, ResourceAssignment> getBaseline() {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index 8b80f2d..3371c8b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -19,11 +19,10 @@ package org.apache.helix.controller.rebalancer.waged;
* under the License.
*/
-import org.apache.helix.HelixManager;
-import org.apache.helix.model.ResourceAssignment;
-
import java.util.HashMap;
import java.util.Map;
+import org.apache.helix.BucketDataAccessor;
+import org.apache.helix.model.ResourceAssignment;
/**
* A mock up metadata store for unit test.
@@ -33,9 +32,8 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
- public MockAssignmentMetadataStore() {
- // In-memory mock component, so pass null for HelixManager since it's not needed
- super(null);
+ MockAssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
+ super(bucketDataAccessor, clusterName);
}
public Map<String, ResourceAssignment> getBaseline() {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 6759a10..d6fd99b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -1,5 +1,24 @@
package org.apache.helix.controller.rebalancer.waged;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -7,7 +26,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-
+import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -28,13 +47,13 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
public class TestWagedRebalancer extends AbstractTestClusterModel {
private Set<String> _instances;
private MockRebalanceAlgorithm _algorithm;
+ private MockAssignmentMetadataStore _metadataStore;
@BeforeClass
public void initialize() {
@@ -42,6 +61,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
_instances = new HashSet<>();
_instances.add(_testInstanceId);
_algorithm = new MockRebalanceAlgorithm();
+
+ // Initialize a mock assignment metadata store
+ BucketDataAccessor mockAccessor = Mockito.mock(BucketDataAccessor.class);
+ String clusterName = ""; // an empty string for testing purposes
+ _metadataStore = new MockAssignmentMetadataStore(mockAccessor, clusterName);
}
@Override
@@ -88,9 +112,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
@Test
public void testRebalance() throws IOException, HelixRebalanceException {
- // Init mock metadatastore for the unit test
- MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+ _metadataStore.clearMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -111,9 +134,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
@Test(dependsOnMethods = "testRebalance")
public void testPartialRebalance() throws IOException, HelixRebalanceException {
- // Init mock metadatastore for the unit test
- MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+ _metadataStore.clearMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -127,7 +149,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Test with partial resources listed in the resourceMap input.
// Remove the first resource from the input. Note it still exists in the cluster data cache.
- metadataStore.clearMetadataStore();
+ _metadataStore.clearMetadataStore();
resourceMap.remove(_resourceNames.get(0));
Map<String, IdealState> newIdealStates =
rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
@@ -137,9 +159,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
@Test(dependsOnMethods = "testRebalance")
public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException {
- // Init mock metadatastore for the unit test
- MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+ _metadataStore.clearMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -160,9 +181,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
String resourceName = csEntry.getKey();
CurrentState cs = csEntry.getValue();
for (Map.Entry<String, String> partitionStateEntry : cs.getPartitionStateMap().entrySet()) {
- currentStateOutput
- .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()),
- instanceName, partitionStateEntry.getValue());
+ currentStateOutput.setCurrentState(resourceName,
+ new Partition(partitionStateEntry.getKey()), instanceName,
+ partitionStateEntry.getValue());
}
}
}
@@ -197,7 +218,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
@Test(dependsOnMethods = "testRebalance")
public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
- WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm);
+ _metadataStore.clearMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
ResourceControllerDataProvider clusterData = setupClusterDataCache();
String nonCompatibleResourceName = _resourceNames.get(0);
@@ -222,7 +244,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// TODO test with invalid capacity configuration which will fail the cluster model constructing.
@Test(dependsOnMethods = "testRebalance")
public void testInvalidClusterStatus() throws IOException {
- WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm);
+ _metadataStore.clearMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
ResourceControllerDataProvider clusterData = setupClusterDataCache();
String invalidResource = _resourceNames.get(0);
@@ -270,8 +293,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.",
HelixRebalanceException.Type.FAILED_TO_CALCULATE));
- WagedRebalancer rebalancer =
- new WagedRebalancer(new MockAssignmentMetadataStore(), badAlgorithm);
+ _metadataStore.clearMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm);
ResourceControllerDataProvider clusterData = setupClusterDataCache();
Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
@@ -294,9 +317,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Note that this test relies on the MockRebalanceAlgorithm implementation. The mock algorithm
// won't propagate any existing assignment from the cluster model.
- // Init mock metadatastore for the unit test
- MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+ _metadataStore.clearMetadataStore();
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
// 1. rebalance with baseline calculation done
// Generate the input for the rebalancer.
@@ -317,10 +339,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Since there is no special condition, the calculated IdealStates should be exactly the same
// as the mock algorithm result.
validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
- Map<String, ResourceAssignment> baseline = metadataStore.getBaseline();
+ Map<String, ResourceAssignment> baseline = _metadataStore.getBaseline();
Assert.assertEquals(baseline, algorithmResult);
Map<String, ResourceAssignment> bestPossibleAssignment =
- metadataStore.getBestPossibleAssignment();
+ _metadataStore.getBestPossibleAssignment();
Assert.assertEquals(bestPossibleAssignment, algorithmResult);
// 2. rebalance with one ideal state changed only
@@ -344,14 +366,14 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Collections.singletonMap(changedResourceName, new Resource(changedResourceName)),
newIdealStates, partialAlgorithmResult);
// Baseline should be empty, because there is no cluster topology change.
- baseline = metadataStore.getBaseline();
+ baseline = _metadataStore.getBaseline();
Assert.assertEquals(baseline, Collections.emptyMap());
// Best possible assignment contains the new assignment of only one resource.
- bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+ bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
Assert.assertEquals(bestPossibleAssignment, partialAlgorithmResult);
// * Before the next test, recover the best possible assignment record.
- metadataStore.persistBestPossibleAssignment(algorithmResult);
+ _metadataStore.persistBestPossibleAssignment(algorithmResult);
// 3. rebalance with current state change only
// Create a new cluster data cache to simulate cluster change
@@ -373,9 +395,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Verify that only the changed resource has been included in the calculation.
validateRebalanceResult(Collections.emptyMap(), newIdealStates, algorithmResult);
// Both assignment state should be empty.
- baseline = metadataStore.getBaseline();
+ baseline = _metadataStore.getBaseline();
Assert.assertEquals(baseline, Collections.emptyMap());
- bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+ bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
Assert.assertEquals(bestPossibleAssignment, Collections.emptyMap());
// 4. rebalance with no change but best possible state record missing.
@@ -389,10 +411,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Verify that both resource has been included in the calculation.
validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
// Both assignment state should be empty since no cluster topology change.
- baseline = metadataStore.getBaseline();
+ baseline = _metadataStore.getBaseline();
Assert.assertEquals(baseline, Collections.emptyMap());
// The best possible assignment should be present.
- bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+ bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
Assert.assertEquals(bestPossibleAssignment, algorithmResult);
}
@@ -403,9 +425,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Assert.assertTrue(newIdealStates.containsKey(resourceName));
IdealState is = newIdealStates.get(resourceName);
ResourceAssignment assignment = expectedResult.get(resourceName);
- Assert.assertEquals(is.getPartitionSet(), new HashSet<>(
- assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName())
- .collect(Collectors.toSet())));
+ Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions()
+ .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet())));
for (String partitionName : is.getPartitionSet()) {
Assert.assertEquals(is.getInstanceStateMap(partitionName),
assignment.getReplicaMap(new Partition(partitionName)));