You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/04/23 19:27:54 UTC
[helix] 08/23: Improve CustomizedStateProvider tests (#840)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 5074c1579d6601d05dff10dc7fff826d77dcb082
Author: mgao0 <31...@users.noreply.github.com>
AuthorDate: Tue Mar 3 10:56:16 2020 -0800
Improve CustomizedStateProvider tests (#840)
Add tests to make CustomizedStateProvider tests comprehensive.
---
.../paticipant/TestCustomizedStateUpdate.java | 224 ++++++++++++++++++---
1 file changed, 195 insertions(+), 29 deletions(-)
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
index e0553cd..bc086ba 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
@@ -19,13 +19,22 @@ package org.apache.helix.integration.paticipant;
* under the License.
*/
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
import org.apache.helix.customizedstate.CustomizedStateProvider;
import org.apache.helix.customizedstate.CustomizedStateProviderFactory;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
@@ -33,40 +42,61 @@ import org.apache.helix.model.CustomizedState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+
public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
private static Logger LOG = LoggerFactory.getLogger(TestCustomizedStateUpdate.class);
private final String CUSTOMIZE_STATE_NAME = "testState1";
private final String PARTITION_NAME1 = "testPartition1";
private final String PARTITION_NAME2 = "testPartition2";
private final String RESOURCE_NAME = "testResource1";
+ private final String PARTITION_STATE = "partitionState";
+ private static HelixManager _manager;
+ private static CustomizedStateProvider _mockProvider;
- @Test
- public void testUpdateCustomizedState() throws Exception {
- HelixManager manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "admin",
- InstanceType.ADMINISTRATOR, ZK_ADDR);
- manager.connect();
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
_participants[0].connect();
+ _mockProvider = CustomizedStateProviderFactory.getInstance()
+ .buildCustomizedStateProvider(_manager, _participants[0].getInstanceName());
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ super.afterClass();
+ _manager.disconnect();
+ }
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ @BeforeMethod
+ public void beforeMethod() {
+ HelixDataAccessor dataAccessor = _manager.getHelixDataAccessor();
PropertyKey propertyKey = dataAccessor.keyBuilder()
.customizedStates(_participants[0].getInstanceName(), CUSTOMIZE_STATE_NAME);
- CustomizedState customizedStates = manager.getHelixDataAccessor().getProperty(propertyKey);
+ dataAccessor.removeProperty(propertyKey);
+ CustomizedState customizedStates = dataAccessor.getProperty(propertyKey);
Assert.assertNull(customizedStates);
+ }
- CustomizedStateProvider mockProvider = CustomizedStateProviderFactory.getInstance()
- .buildCustomizedStateProvider(manager, _participants[0].getInstanceName());
+ @Test
+ public void testUpdateCustomizedState() {
// test adding customized state for a partition
Map<String, String> customizedStateMap = new HashMap<>();
customizedStateMap.put("PREVIOUS_STATE", "STARTED");
customizedStateMap.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
- mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
customizedStateMap);
CustomizedState customizedState =
- mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNotNull(customizedState);
Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
Map<String, Map<String, String>> mapView = customizedState.getRecord().getMapFields();
@@ -79,10 +109,10 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
// test partial update customized state for previous partition
Map<String, String> stateMap1 = new HashMap<>();
stateMap1.put("PREVIOUS_STATE", "END_OF_PUSH_RECEIVED");
- mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
- stateMap1);
+ _mockProvider
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1, stateMap1);
- customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNotNull(customizedState);
Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
mapView = customizedState.getRecord().getMapFields();
@@ -96,10 +126,10 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
stateMap1 = new HashMap<>();
stateMap1.put("PREVIOUS_STATE", "END_OF_PUSH_RECEIVED");
stateMap1.put("CURRENT_STATE", "COMPLETED");
- mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
- stateMap1);
+ _mockProvider
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1, stateMap1);
- customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNotNull(customizedState);
Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
mapView = customizedState.getRecord().getMapFields();
@@ -113,40 +143,176 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
Map<String, String> stateMap2 = new HashMap<>();
stateMap2.put("PREVIOUS_STATE", "STARTED");
stateMap2.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
- mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2,
- stateMap2);
+ _mockProvider
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2, stateMap2);
- customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNotNull(customizedState);
Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 2);
- Assert.assertEqualsNoOrder(mapView.keySet().toArray(), new String[] {
- PARTITION_NAME1, PARTITION_NAME2
- });
+ Assert.assertEqualsNoOrder(mapView.keySet().toArray(),
+ new String[]{PARTITION_NAME1, PARTITION_NAME2});
- Map<String, String> partitionMap1 = mockProvider
+ Map<String, String> partitionMap1 = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
Assert.assertEquals(partitionMap1.keySet().size(), 2);
Assert.assertEquals(partitionMap1.get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
Assert.assertEquals(partitionMap1.get("CURRENT_STATE"), "COMPLETED");
- Map<String, String> partitionMap2 = mockProvider
+ Map<String, String> partitionMap2 = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2);
Assert.assertEquals(partitionMap2.keySet().size(), 2);
Assert.assertEquals(partitionMap2.get("PREVIOUS_STATE"), "STARTED");
Assert.assertEquals(partitionMap2.get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");
// test delete customized state for a partition
- mockProvider.deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
- PARTITION_NAME1);
- customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ _mockProvider
+ .deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
+ customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
Assert.assertNotNull(customizedState);
Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 1);
Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME2);
+ }
+
+ @Test
+ public void testUpdateSinglePartitionCustomizedState() {
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
+ PARTITION_STATE);
+
+ // get customized state
+ CustomizedState customizedState =
+ _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ Assert.assertEquals(
+ customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE)
+ .size(), 1);
+ Map<String, String> map = new HashMap<>();
+ map.put(PARTITION_NAME1, null);
+ Assert.assertEquals(customizedState
+ .getPartitionStateMap(CustomizedState.CustomizedStateProperty.PREVIOUS_STATE), map);
+ Assert.assertEquals(
+ customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.START_TIME),
+ map);
+ Assert.assertEquals(
+ customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.END_TIME),
+ map);
+ Assert.assertEquals(customizedState.getState(PARTITION_NAME1), PARTITION_STATE);
+ Assert.assertNull(customizedState.getState(PARTITION_NAME2));
+ Assert.assertTrue(customizedState.isValid());
+
+ // get per partition customized state
+ map = new HashMap<>();
+ map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), PARTITION_STATE);
+ Map<String, String> partitionCustomizedState = _mockProvider
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
+ Assert.assertEquals(partitionCustomizedState, map);
+ Assert.assertNull(_mockProvider
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
+ }
+
+ @Test
+ public void testUpdateSinglePartitionCustomizedStateWithNullField() {
+ _mockProvider
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1, (String) null);
+
+ // get customized state
+ CustomizedState customizedState =
+ _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ Map<String, String> map = new HashMap<>();
+ map.put(PARTITION_NAME1, null);
+ Assert.assertEquals(
+ customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE),
+ map);
+ Assert.assertEquals(customizedState.getState(PARTITION_NAME1), null);
+ Assert.assertTrue(customizedState.isValid());
+
+ // get per partition customized state
+ map = new HashMap<>();
+ map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), null);
+ Map<String, String> partitionCustomizedState = _mockProvider
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
+ Assert.assertEquals(partitionCustomizedState, map);
+ Assert.assertNull(_mockProvider
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
+ }
+
+ @Test
+ public void testUpdateCustomizedStateWithEmptyMap() {
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
+ new HashMap<>());
+
+ // get customized state
+ CustomizedState customizedState =
+ _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ Assert.assertNull(customizedState.getState(PARTITION_NAME1));
+ Map<String, String> partitionStateMap =
+ customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE);
+ Assert.assertNotNull(partitionStateMap);
+ Assert.assertTrue(partitionStateMap.containsKey(PARTITION_NAME1));
+ Assert.assertNull(partitionStateMap.get(PARTITION_NAME1));
+ Assert.assertNull(customizedState.getState(PARTITION_NAME1));
+ Assert.assertFalse(partitionStateMap.containsKey(PARTITION_NAME2));
+ Assert.assertTrue(customizedState.isValid());
+
+ // get per partition customized state
+ Map<String, String> partitionCustomizedState = _mockProvider
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
+ Assert.assertEquals(partitionCustomizedState.size(), 0);
+ Assert.assertNull(_mockProvider
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
+ }
+
+ @Test
+ public void testDeleteNonExistingPerPartitionCustomizedState() {
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
+ PARTITION_STATE);
+ _mockProvider
+ .deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2);
+ Assert.assertNotNull(_mockProvider
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1));
+ Assert.assertNull(_mockProvider
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
+ }
+
+ @Test
+ public void testSimultaneousUpdateCustomizedState() {
+ int n = 10;
+
+ List<Callable<Boolean>> threads = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ threads.add(new TestSimultaneousUpdate());
+ }
+ Map<String, Boolean> resultMap = TestHelper.startThreadsConcurrently(threads, 1000);
+ Assert.assertEquals(resultMap.size(), n);
+ Boolean[] results = new Boolean[n];
+ Arrays.fill(results, true);
+ Assert.assertEqualsNoOrder(resultMap.values().toArray(), results);
+ }
+
+ private static class TestSimultaneousUpdate implements Callable<Boolean> {
+ private Random rand = new Random();
- manager.disconnect();
+ @Override
+ public Boolean call() {
+ String customizedStateName = "testState";
+ String resourceName = "resource" + String.valueOf(rand.nextInt(10));
+ String partitionName = "partition" + String.valueOf(rand.nextInt(10));
+ String partitionState = "Updated";
+ try {
+ _mockProvider.updateCustomizedState(customizedStateName, resourceName, partitionName,
+ partitionState);
+ } catch (Exception e) {
+ return false;
+ }
+ Map<String, String> states = _mockProvider
+ .getPerPartitionCustomizedState(customizedStateName, resourceName, partitionName);
+ if (states == null) {
+ return false;
+ }
+ return states.get(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name())
+ .equals(partitionState);
+ }
}
}