You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/05/24 19:11:32 UTC

helix git commit: [HELIX-657] Fix unexpected idealstate overwrite when persist assignment is on.

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 8ba068e7b -> 091337c42


[HELIX-657] Fix unexpected idealstate overwrite when persist assignment is on.

1. Change persist method from set to update in PersistAssignmentStage.
The new updater only overwrites map and list fields that the controller will update during PersistAssignmentStage.
All the other updates from other sources that are made during controller read and write will be kept, as long as those fields are not purposely updated by the controller.
If current node does not exist, new updater return null.
2. Update accessors who relies on updater to check new data before applying the change. If the returned new data is null, should skip updating or creating.

Also, add a test case for PersistAssignmentStage to cover the change.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/091337c4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/091337c4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/091337c4

Branch: refs/heads/helix-0.6.x
Commit: 091337c4249f37fd3235c9cf630a861a6abee557
Parents: 8ba068e
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Tue May 23 18:02:24 2017 -0700
Committer: Jiajun Wang <jj...@linkedin.com>
Committed: Wed May 24 12:05:57 2017 -0700

----------------------------------------------------------------------
 .../org/apache/helix/HelixDataAccessor.java     |  9 ++
 .../stages/PersistAssignmentStage.java          | 16 +++-
 .../helix/manager/zk/ZKHelixDataAccessor.java   |  7 +-
 .../helix/manager/zk/ZkBaseDataAccessor.java    | 23 +++--
 .../src/test/java/org/apache/helix/Mocks.java   | 17 +++-
 .../integration/TestPersistAssignmentStage.java | 99 ++++++++++++++++++++
 6 files changed, 159 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/091337c4/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java
index 0f48539..e7777c6 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java
@@ -60,6 +60,15 @@ public interface HelixDataAccessor {
   <T extends HelixProperty> boolean updateProperty(PropertyKey key, T value);
 
   /**
+   * Updates a property using specified updater
+   * @param key
+   * @param updater an update routine for the data to merge in
+   * @param value
+   * @return true if the update was successful
+   */
+  <T extends HelixProperty> boolean updateProperty(PropertyKey key, DataUpdater<ZNRecord> updater, T value);
+
+  /**
    * Return the property value, it must be refer to a single Helix Property. i.e
    * PropertyKey.isLeaf() must return true.
    * @param key

http://git-wip-us.apache.org/repos/asf/helix/blob/091337c4/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 425b38b..8255cf4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -23,9 +23,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -97,7 +99,19 @@ public class PersistAssignmentStage extends AbstractBaseStage {
         }
 
         if (needPersist) {
-          accessor.setProperty(keyBuilder.idealStates(resourceId), idealState);
+          // Update instead of set to ensure any intermediate changes that the controller does not update are kept.
+          accessor.updateProperty(keyBuilder.idealStates(resourceId), new DataUpdater<ZNRecord>() {
+            @Override
+            public ZNRecord update(ZNRecord current) {
+              if (current != null) {
+                // Overwrite MapFields and ListFields items with the same key.
+                // Note that default merge will keep old values in the maps or lists unchanged, which is not desired.
+                current.getMapFields().putAll(idealState.getRecord().getMapFields());
+                current.getListFields().putAll(idealState.getRecord().getListFields());
+              }
+              return current;
+            }
+          }, idealState);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/091337c4/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 791c6d8..d9818ca 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -157,6 +157,11 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
 
   @Override
   public <T extends HelixProperty> boolean updateProperty(PropertyKey key, T value) {
+    return updateProperty(key, new ZNRecordUpdater(value.getRecord()), value);
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean updateProperty(PropertyKey key, DataUpdater<ZNRecord> updater, T value) {
     PropertyType type = key.getType();
     String path = key.getPath();
     int options = constructOptions(type);
@@ -172,7 +177,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
       }
       break;
     default:
-      success = _baseDataAccessor.update(path, new ZNRecordUpdater(value.getRecord()), options);
+      success = _baseDataAccessor.update(path, updater, options);
       break;
     }
     return success;

http://git-wip-us.apache.org/repos/asf/helix/blob/091337c4/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index d79abeb..dacecd9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -256,9 +256,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         Stat readStat = new Stat();
         T oldData = (T) _zkClient.readData(path, readStat);
         T newData = updater.update(oldData);
-        Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
-        DataTree.copyStat(setStat, result._stat);
-
+        if (newData != null) {
+          Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
+          DataTree.copyStat(setStat, result._stat);
+        }
         updatedData = newData;
       } catch (ZkBadVersionException e) {
         retry = true;
@@ -266,9 +267,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         // node not exist, try create, pass null to updater
         try {
           T newData = updater.update(null);
-          AccessResult res = doCreate(path, newData, options);
-          result._pathCreated.addAll(res._pathCreated);
-          RetCode rc = res._retCode;
+          RetCode rc;
+          if (newData != null) {
+            AccessResult res = doCreate(path, newData, options);
+            result._pathCreated.addAll(res._pathCreated);
+            rc = res._retCode;
+          } else {
+            // If update returns null, no need to create.
+            rc = RetCode.OK;
+          }
           switch (rc) {
           case OK:
             updatedData = newData;
@@ -810,6 +817,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
           DataUpdater<T> updater = updaters.get(i);
           T newData = updater.update(curDataList.get(i));
           newDataList.add(newData);
+          if (newData == null) {
+            // No need to create or update if the updater does not return a new version
+            continue;
+          }
           Stat curStat = curStats.get(i);
           if (curStat == null) {
             // node not exists

http://git-wip-us.apache.org/repos/asf/helix/blob/091337c4/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index c60a693..70469d8 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -508,14 +508,21 @@ public class Mocks {
 
     @Override
     public <T extends HelixProperty> boolean updateProperty(PropertyKey key, T value) {
+      return updateProperty(key, new ZNRecordUpdater(value.getRecord()) , value);
+    }
+
+    @Override
+    public <T extends HelixProperty> boolean updateProperty(PropertyKey key, DataUpdater<ZNRecord> updater, T value) {
       String path = key.getPath();
       PropertyType type = key.getType();
       if (type.updateOnlyOnExists) {
         if (data.containsKey(path)) {
           if (type.mergeOnUpdate) {
             ZNRecord znRecord = new ZNRecord(data.get(path));
-            znRecord.merge(value.getRecord());
-            data.put(path, znRecord);
+            ZNRecord newZNRecord = updater.update(znRecord);
+            if (newZNRecord != null) {
+              data.put(path, newZNRecord);
+            }
           } else {
             data.put(path, value.getRecord());
           }
@@ -524,8 +531,10 @@ public class Mocks {
         if (type.mergeOnUpdate) {
           if (data.containsKey(path)) {
             ZNRecord znRecord = new ZNRecord(data.get(path));
-            znRecord.merge(value.getRecord());
-            data.put(path, znRecord);
+            ZNRecord newZNRecord = updater.update(znRecord);
+            if (newZNRecord != null) {
+              data.put(path, znRecord);
+            }
           } else {
             data.put(path, value.getRecord());
           }

http://git-wip-us.apache.org/repos/asf/helix/blob/091337c4/helix-core/src/test/java/org/apache/helix/integration/TestPersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPersistAssignmentStage.java b/helix-core/src/test/java/org/apache/helix/integration/TestPersistAssignmentStage.java
new file mode 100644
index 0000000..25bd8a7
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPersistAssignmentStage.java
@@ -0,0 +1,99 @@
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.PersistAssignmentStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.tools.DefaultIdealStateCalculator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestPersistAssignmentStage extends ZkStandAloneCMTestBase {
+  ClusterEvent event = new ClusterEvent("sampleEvent");
+
+  /**
+   * Case where we have one resource in IdealState
+   * @throws Exception
+   */
+  @Test
+  public void testSimple() throws Exception {
+    int nodes = 2;
+    List<String> instances = new ArrayList<String>();
+    for (int i = 0; i < nodes; i++) {
+      instances.add("localhost_" + i);
+    }
+    int partitions = 10;
+    int replicas = 1;
+    String resourceName = "testResource";
+    ZNRecord record =
+        DefaultIdealStateCalculator.calculateIdealState(instances, partitions, replicas, resourceName, "ONLINE",
+            "OFFLINE");
+    IdealState idealState = new IdealState(record);
+    idealState.setStateModelDefRef("OnlineOffline");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+
+    // Read and load current state into event
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, new ResourceComputationStage());
+
+    // Ensure persist best possible assignment is true
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    cache.getClusterConfig().setPersistBestPossibleAssignment(true);
+
+    // 1. Change best possible state (simulate a new rebalancer run)
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    for (String partition : idealState.getPartitionSet()) {
+      bestPossibleStateOutput.setState(resourceName, new Partition(partition), "localhost_3", "OFFLINE");
+    }
+    // 2. At the same time, set DelayRebalanceEnabled = true (simulate a Admin operation at the same time)
+    idealState.setDelayRebalanceDisabled(true);
+    accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
+
+    // Persist new assignment
+    PersistAssignmentStage stage = new PersistAssignmentStage();
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+    runStage(event, stage);
+
+    IdealState newIdealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+    // 1. New assignment should be set
+    Assert.assertEquals(newIdealState.getPartitionSet().size(), idealState.getPartitionSet().size());
+    for (String partition : idealState.getPartitionSet()) {
+      Map<String, String> assignment = newIdealState.getInstanceStateMap(partition);
+      Assert.assertNotNull(assignment);
+      Assert.assertEquals(assignment.size(),1);
+      Assert.assertTrue(assignment.containsKey("localhost_3") && assignment.get("localhost_3").equals("OFFLINE"));
+    }
+    // 2. Admin config should be set
+    Assert.assertTrue(newIdealState.isDelayRebalanceDisabled());
+  }
+
+  private void runStage(ClusterEvent event, Stage stage) {
+    event.addAttribute("helixmanager", _manager);
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    try {
+      stage.process(event);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    stage.postProcess();
+  }
+}