You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:00 UTC

[helix] 22/50: Fix TestWagedRebalancer and add constructor in AssignmentMetadataStore

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 80d340ef1b398fdcd0436f418884982ff3b7e878
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Tue Sep 10 09:42:23 2019 -0700

    Fix TestWagedRebalancer and add constructor in AssignmentMetadataStore
    
    TestWagedRebalancer was failing because it was not using a proper HelixManager to instantiate a mock version of AssignmentMetadataStore. This diff refactors the constructors in AssignmentMetadataStore and fixes the failing test.
---
 .../rebalancer/waged/AssignmentMetadataStore.java  | 18 +++--
 .../waged/MockAssignmentMetadataStore.java         | 10 +--
 .../rebalancer/waged/TestWagedRebalancer.java      | 93 +++++++++++++---------
 3 files changed, 71 insertions(+), 50 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index bf9f292..fd655d1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -21,6 +21,8 @@ package org.apache.helix.controller.rebalancer.waged;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
@@ -31,9 +33,6 @@ import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
 import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.ResourceAssignment;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * A placeholder before we have the real assignment metadata store.
  */
@@ -51,12 +50,15 @@ public class AssignmentMetadataStore {
   private Map<String, ResourceAssignment> _globalBaseline;
   private Map<String, ResourceAssignment> _bestPossibleAssignment;
 
+  AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
+    _dataAccessor = bucketDataAccessor;
+    _baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
+    _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
+  }
+
   AssignmentMetadataStore(HelixManager helixManager) {
-    _dataAccessor = new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString());
-    _baselinePath =
-        String.format(BASELINE_TEMPLATE, helixManager.getClusterName(), ASSIGNMENT_METADATA_KEY);
-    _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, helixManager.getClusterName(),
-        ASSIGNMENT_METADATA_KEY);
+    this(new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()),
+        helixManager.getClusterName());
   }
 
   public Map<String, ResourceAssignment> getBaseline() {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index 8b80f2d..3371c8b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -19,11 +19,10 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
-import org.apache.helix.HelixManager;
-import org.apache.helix.model.ResourceAssignment;
-
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.helix.BucketDataAccessor;
+import org.apache.helix.model.ResourceAssignment;
 
 /**
  * A mock up metadata store for unit test.
@@ -33,9 +32,8 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
   private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
   private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
 
-  public MockAssignmentMetadataStore() {
-    // In-memory mock component, so pass null for HelixManager since it's not needed
-    super(null);
+  MockAssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
+    super(bucketDataAccessor, clusterName);
   }
 
   public Map<String, ResourceAssignment> getBaseline() {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 6759a10..d6fd99b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -1,5 +1,24 @@
 package org.apache.helix.controller.rebalancer.waged;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -7,7 +26,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-
+import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -28,13 +47,13 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
 
 public class TestWagedRebalancer extends AbstractTestClusterModel {
   private Set<String> _instances;
   private MockRebalanceAlgorithm _algorithm;
+  private MockAssignmentMetadataStore _metadataStore;
 
   @BeforeClass
   public void initialize() {
@@ -42,6 +61,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     _instances = new HashSet<>();
     _instances.add(_testInstanceId);
     _algorithm = new MockRebalanceAlgorithm();
+
+    // Initialize a mock assignment metadata store
+    BucketDataAccessor mockAccessor = Mockito.mock(BucketDataAccessor.class);
+    String clusterName = ""; // an empty string for testing purposes
+    _metadataStore = new MockAssignmentMetadataStore(mockAccessor, clusterName);
   }
 
   @Override
@@ -88,9 +112,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
   @Test
   public void testRebalance() throws IOException, HelixRebalanceException {
-    // Init mock metadatastore for the unit test
-    MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+    _metadataStore.clearMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -111,9 +134,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
   @Test(dependsOnMethods = "testRebalance")
   public void testPartialRebalance() throws IOException, HelixRebalanceException {
-    // Init mock metadatastore for the unit test
-    MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+    _metadataStore.clearMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -127,7 +149,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
     // Test with partial resources listed in the resourceMap input.
     // Remove the first resource from the input. Note it still exists in the cluster data cache.
-    metadataStore.clearMetadataStore();
+    _metadataStore.clearMetadataStore();
     resourceMap.remove(_resourceNames.get(0));
     Map<String, IdealState> newIdealStates =
         rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
@@ -137,9 +159,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
   @Test(dependsOnMethods = "testRebalance")
   public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException {
-    // Init mock metadatastore for the unit test
-    MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+    _metadataStore.clearMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -160,9 +181,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
         String resourceName = csEntry.getKey();
         CurrentState cs = csEntry.getValue();
         for (Map.Entry<String, String> partitionStateEntry : cs.getPartitionStateMap().entrySet()) {
-          currentStateOutput
-              .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()),
-                  instanceName, partitionStateEntry.getValue());
+          currentStateOutput.setCurrentState(resourceName,
+              new Partition(partitionStateEntry.getKey()), instanceName,
+              partitionStateEntry.getValue());
         }
       }
     }
@@ -197,7 +218,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
   @Test(dependsOnMethods = "testRebalance")
   public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
-    WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm);
+    _metadataStore.clearMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     String nonCompatibleResourceName = _resourceNames.get(0);
@@ -222,7 +244,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   // TODO test with invalid capacity configuration which will fail the cluster model constructing.
   @Test(dependsOnMethods = "testRebalance")
   public void testInvalidClusterStatus() throws IOException {
-    WagedRebalancer rebalancer = new WagedRebalancer(new MockAssignmentMetadataStore(), _algorithm);
+    _metadataStore.clearMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     String invalidResource = _resourceNames.get(0);
@@ -270,8 +293,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.",
         HelixRebalanceException.Type.FAILED_TO_CALCULATE));
 
-    WagedRebalancer rebalancer =
-        new WagedRebalancer(new MockAssignmentMetadataStore(), badAlgorithm);
+    _metadataStore.clearMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm);
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
@@ -294,9 +317,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Note that this test relies on the MockRebalanceAlgorithm implementation. The mock algorithm
     // won't propagate any existing assignment from the cluster model.
 
-    // Init mock metadatastore for the unit test
-    MockAssignmentMetadataStore metadataStore = new MockAssignmentMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+    _metadataStore.clearMetadataStore();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     // 1. rebalance with baseline calculation done
     // Generate the input for the rebalancer.
@@ -317,10 +339,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Since there is no special condition, the calculated IdealStates should be exactly the same
     // as the mock algorithm result.
     validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
-    Map<String, ResourceAssignment> baseline = metadataStore.getBaseline();
+    Map<String, ResourceAssignment> baseline = _metadataStore.getBaseline();
     Assert.assertEquals(baseline, algorithmResult);
     Map<String, ResourceAssignment> bestPossibleAssignment =
-        metadataStore.getBestPossibleAssignment();
+        _metadataStore.getBestPossibleAssignment();
     Assert.assertEquals(bestPossibleAssignment, algorithmResult);
 
     // 2. rebalance with one ideal state changed only
@@ -344,14 +366,14 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
         Collections.singletonMap(changedResourceName, new Resource(changedResourceName)),
         newIdealStates, partialAlgorithmResult);
     // Baseline should be empty, because there is no cluster topology change.
-    baseline = metadataStore.getBaseline();
+    baseline = _metadataStore.getBaseline();
     Assert.assertEquals(baseline, Collections.emptyMap());
     // Best possible assignment contains the new assignment of only one resource.
-    bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+    bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
     Assert.assertEquals(bestPossibleAssignment, partialAlgorithmResult);
 
     // * Before the next test, recover the best possible assignment record.
-    metadataStore.persistBestPossibleAssignment(algorithmResult);
+    _metadataStore.persistBestPossibleAssignment(algorithmResult);
 
     // 3. rebalance with current state change only
     // Create a new cluster data cache to simulate cluster change
@@ -373,9 +395,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Verify that only the changed resource has been included in the calculation.
     validateRebalanceResult(Collections.emptyMap(), newIdealStates, algorithmResult);
     // Both assignment state should be empty.
-    baseline = metadataStore.getBaseline();
+    baseline = _metadataStore.getBaseline();
     Assert.assertEquals(baseline, Collections.emptyMap());
-    bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+    bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
     Assert.assertEquals(bestPossibleAssignment, Collections.emptyMap());
 
     // 4. rebalance with no change but best possible state record missing.
@@ -389,10 +411,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Verify that both resource has been included in the calculation.
     validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
     // Both assignment state should be empty since no cluster topology change.
-    baseline = metadataStore.getBaseline();
+    baseline = _metadataStore.getBaseline();
     Assert.assertEquals(baseline, Collections.emptyMap());
     // The best possible assignment should be present.
-    bestPossibleAssignment = metadataStore.getBestPossibleAssignment();
+    bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
     Assert.assertEquals(bestPossibleAssignment, algorithmResult);
   }
 
@@ -403,9 +425,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       Assert.assertTrue(newIdealStates.containsKey(resourceName));
       IdealState is = newIdealStates.get(resourceName);
       ResourceAssignment assignment = expectedResult.get(resourceName);
-      Assert.assertEquals(is.getPartitionSet(), new HashSet<>(
-          assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName())
-              .collect(Collectors.toSet())));
+      Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions()
+          .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet())));
       for (String partitionName : is.getPartitionSet()) {
         Assert.assertEquals(is.getInstanceStateMap(partitionName),
             assignment.getReplicaMap(new Partition(partitionName)));