You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/04/23 19:27:57 UTC

[helix] 11/23: Use updater to update customized state for concurrency control (#859)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit d6fd3622aa175db0f213885954ded314a9deba02
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Wed Mar 11 10:45:58 2020 -0700

    Use updater to update customized state for concurrency control (#859)
    
    Currently the update customized state method is made synchronized for concurrency control. This commit modifies the implementation of update to leave the responsibility of concurrency control to ZooKeeper by using updater to update the customize state. With delete method already implemented with updater, we can prevent unexpected change of the customize state data.
---
 .../customizedstate/CustomizedStateProvider.java   | 23 +++++-----------------
 .../paticipant/TestCustomizedStateUpdate.java      | 21 ++++++++++----------
 2 files changed, 16 insertions(+), 28 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
index 3807ea6..80f4d91 100644
--- a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
@@ -50,7 +50,7 @@ public class CustomizedStateProvider {
    * Update a specific customized state based on the resource name and partition name. The
    * customized state is input as a single string
    */
-  public synchronized void updateCustomizedState(String customizedStateName, String resourceName,
+  public void updateCustomizedState(String customizedStateName, String resourceName,
       String partitionName, String customizedState) {
     Map<String, String> customizedStateMap = new HashMap<>();
     customizedStateMap.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), customizedState);
@@ -61,29 +61,16 @@ public class CustomizedStateProvider {
    * Update a specific customized state based on the resource name and partition name. The
    * customized state is input as a map
    */
-  public synchronized void updateCustomizedState(String customizedStateName, String resourceName,
+  public void updateCustomizedState(String customizedStateName, String resourceName,
       String partitionName, Map<String, String> customizedStateMap) {
     PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
     PropertyKey propertyKey =
         keyBuilder.customizedState(_instanceName, customizedStateName, resourceName);
     ZNRecord record = new ZNRecord(resourceName);
-    Map<String, Map<String, String>> mapFields = new HashMap<>();
-    CustomizedState existingState = getCustomizedState(customizedStateName, resourceName);
-    if (existingState != null
-        && existingState.getRecord().getMapFields().containsKey(partitionName)) {
-      Map<String, String> existingMap = new HashMap<>();
-      for (String key : customizedStateMap.keySet()) {
-        existingMap.put(key, customizedStateMap.get(key));
-      }
-
-      mapFields.put(partitionName, existingMap);
-    } else {
-      mapFields.put(partitionName, customizedStateMap);
-    }
-    record.setMapFields(mapFields);
+    record.setMapField(partitionName, customizedStateMap);
     if (!_helixDataAccessor.updateProperty(propertyKey, new CustomizedState(record))) {
-      throw new HelixException(
-          String.format("Failed to persist customized state %s to zk for instance %s, resource %s",
+      throw new HelixException(String
+          .format("Failed to persist customized state %s to zk for instance %s, resource %s",
               customizedStateName, _instanceName, record.getId()));
     }
   }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
index bc086ba..740f2fd 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
@@ -57,6 +57,8 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
   private final String PARTITION_STATE = "partitionState";
   private static HelixManager _manager;
   private static CustomizedStateProvider _mockProvider;
+  private PropertyKey _propertyKey;
+  private HelixDataAccessor _dataAccessor;
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -67,6 +69,9 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
     _participants[0].connect();
     _mockProvider = CustomizedStateProviderFactory.getInstance()
         .buildCustomizedStateProvider(_manager, _participants[0].getInstanceName());
+    _dataAccessor = _manager.getHelixDataAccessor();
+    _propertyKey = _dataAccessor.keyBuilder()
+        .customizedStates(_participants[0].getInstanceName(), CUSTOMIZE_STATE_NAME);
   }
 
   @AfterClass
@@ -77,11 +82,8 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
 
   @BeforeMethod
   public void beforeMethod() {
-    HelixDataAccessor dataAccessor = _manager.getHelixDataAccessor();
-    PropertyKey propertyKey = dataAccessor.keyBuilder()
-        .customizedStates(_participants[0].getInstanceName(), CUSTOMIZE_STATE_NAME);
-    dataAccessor.removeProperty(propertyKey);
-    CustomizedState customizedStates = dataAccessor.getProperty(propertyKey);
+    _dataAccessor.removeProperty(_propertyKey);
+    CustomizedState customizedStates = _dataAccessor.getProperty(_propertyKey);
     Assert.assertNull(customizedStates);
   }
 
@@ -278,15 +280,14 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
 
   @Test
   public void testSimultaneousUpdateCustomizedState() {
-    int n = 10;
-
     List<Callable<Boolean>> threads = new ArrayList<>();
-    for (int i = 0; i < n; i++) {
+    int threadCount = 10;
+    for (int i = 0; i < threadCount; i++) {
       threads.add(new TestSimultaneousUpdate());
     }
     Map<String, Boolean> resultMap = TestHelper.startThreadsConcurrently(threads, 1000);
-    Assert.assertEquals(resultMap.size(), n);
-    Boolean[] results = new Boolean[n];
+    Assert.assertEquals(resultMap.size(), threadCount);
+    Boolean[] results = new Boolean[threadCount];
     Arrays.fill(results, true);
     Assert.assertEqualsNoOrder(resultMap.values().toArray(), results);
   }