You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/11/02 00:11:45 UTC

[1/3] helix git commit: Fix redundant DROPPED message sent to participant

Repository: helix
Updated Branches:
  refs/heads/master befb1036f -> 73c3f0ad8


Fix redundant DROPPED message sent to participant

It was caused by combination of two Helix logic:

1. Helix caches best possible mapping and wont recompute it unless there are changes to IdealState, LiveInstance, ResourceConfig or InstanceConfig .
2. In message generation, if current state does not exist, Helix will think it is in INITIAL (OFFLINE) state

In this case, we have two fixes for that:

1. If we see current state is null and target state is DROPPED, Helix will not send OFFLINE -> DROPPED message anymore.
2. if we see recurrent OFFLINE -> DROPPED message, Helix will clean up the cached best possible mapping for this resource and let it recompute.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1e1cc41e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1e1cc41e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1e1cc41e

Branch: refs/heads/master
Commit: 1e1cc41ea1c2d911ee8d010495121bd73e5fb4d2
Parents: befb103
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Oct 17 16:26:38 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 17:10:35 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     |  8 +++
 .../stages/MessageGenerationPhase.java          |  8 +++
 .../controller/TestRedundantDroppedMessage.java | 72 ++++++++++++++++++++
 .../paticipant/TestStateTransitionTimeout.java  |  2 +-
 .../TestStateTransitionTimeoutWithResource.java |  2 +-
 5 files changed, 90 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 6de6d51..08e98cc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -871,6 +871,14 @@ public class ClusterDataCache extends AbstractDataCache {
   }
 
   /**
+   * Invalid the cached resourceAssignment (ideal mapping) for a resource
+   * @param resource
+   */
+  public void invalidCachedIdealStateMapping(String resource) {
+    _idealMappingCache.remove(resource);
+  }
+
+  /**
    * Get cached idealmapping
    * @return
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index b1013d1..c829082 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.SystemPropertyKeys;
@@ -150,6 +151,13 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
             currentStateOutput.getCurrentState(resourceName, partition, instanceName);
         if (currentState == null) {
           currentState = stateModelDef.getInitialState();
+          if (desiredState.equals(HelixDefinedState.DROPPED.name())) {
+            LogUtil.logDebug(logger, _eventId, String
+                .format("No current state for partition %s in resource %s, skip the drop message",
+                    partition.getPartitionName(), resourceName));
+            cache.invalidCachedIdealStateMapping(resourceName);
+            continue;
+          }
         }
 
         Message pendingMessage =

http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
new file mode 100644
index 0000000..438ad4f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
@@ -0,0 +1,72 @@
+package org.apache.helix.integration.controller;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.MessageOutput;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.task.TaskSynchronizedTestBase;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRedundantDroppedMessage extends TaskSynchronizedTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numNodes = 2;
+    _numReplicas = 1;
+    _numDbs = 1;
+    _numPartitions = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testNoRedundantDropMessage() throws Exception {
+    String resourceName = "TEST_RESOURCE";
+    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, resourceName, 1, "MasterSlave",
+        IdealState.RebalanceMode.CUSTOMIZED.name());
+    String partitionName = "P_0";
+    ClusterEvent event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.Unknown, "ID");
+    ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME);
+    cache.refresh(_manager.getHelixDataAccessor());
+    IdealState idealState = cache.getIdealState(resourceName);
+    idealState.setReplicas("2");
+    Map<String, String> stateMap = new HashMap<>();
+    stateMap.put(_participants[0].getInstanceName(), "SLAVE");
+    stateMap.put(_participants[1].getInstanceName(), "DROPPED");
+    idealState.setInstanceStateMap(partitionName, stateMap);
+
+    cache.setIdealStates(Arrays.asList(idealState));
+    cache.setCachedIdealMapping(idealState.getResourceName(), idealState.getRecord());
+
+    event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+    event.addAttribute(AttributeName.helixmanager.name(), _manager);
+
+    runStage(event, new ResourceComputationStage());
+    runStage(event, new CurrentStateComputationStage());
+    runStage(event, new BestPossibleStateCalcStage());
+    runStage(event, new IntermediateStateCalcStage());
+    Assert.assertEquals(cache.getCachedIdealMapping().size(), 1);
+    runStage(event, new ResourceMessageGenerationPhase());
+
+    MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_ALL.name());
+    Assert
+        .assertEquals(messageOutput.getMessages(resourceName, new Partition(partitionName)).size(),
+            1);
+    Assert.assertEquals(cache.getCachedIdealMapping().size(), 0);
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
index 2f562e1..74cf9a2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
@@ -160,7 +160,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
 
     boolean result =
         ClusterStateVerifier
-            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+            .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
     HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
index bf3c84e..cd8f882 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
@@ -189,7 +189,7 @@ public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBa
     _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME, TEST_DB + 1, true);
     boolean result =
         ClusterStateVerifier
-            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+            .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
     verify(TEST_DB + 1);
   }


[3/3] helix git commit: Fix external view does not remove old ones

Posted by jx...@apache.org.
Fix external view does not remove old ones

Local cached external view may not contains the one that idealstate, current states got removed. When controller switches, we shall read the external views from Helix at least once.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/73c3f0ad
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/73c3f0ad
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/73c3f0ad

Branch: refs/heads/master
Commit: 73c3f0ad81c4ef76cbd1e07d9368fe652099e56c
Parents: 0f0d417
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Oct 29 16:13:48 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 17:10:46 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 21 +++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/73c3f0ad/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 08e98cc..9311ca2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -87,8 +87,8 @@ public class ClusterDataCache extends AbstractDataCache {
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap = new HashMap<>();
   private Map<String, Map<String, String>> _lastTopStateLocationMap = new HashMap<>();
-  private Map<String, ExternalView> _targetExternalViewMap = new HashMap<>();
-  private Map<String, ExternalView> _externalViewMap = new HashMap<>();
+  private Map<String, ExternalView> _targetExternalViewMap;
+  private Map<String, ExternalView> _externalViewMap;
   private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
   private Set<String> _disabledInstanceSet = new HashSet<>();
   private String _eventId = "NO_ID";
@@ -261,6 +261,15 @@ public class ClusterDataCache extends AbstractDataCache {
 
     updateDisabledInstances();
 
+    if (_externalViewMap == null) {
+      _externalViewMap = accessor.getChildValuesMap(accessor.keyBuilder().externalViews());
+    }
+
+    if (_clusterConfig.isTargetExternalViewEnabled() && _targetExternalViewMap == null) {
+      _targetExternalViewMap =
+          accessor.getChildValuesMap(accessor.keyBuilder().targetExternalViews());
+    }
+
     long endTime = System.currentTimeMillis();
     LogUtil.logInfo(LOG, _eventId,
         "END: ClusterDataCache.refresh() for cluster " + getClusterName() + ", took "
@@ -782,7 +791,7 @@ public class ClusterDataCache extends AbstractDataCache {
   }
 
   public ExternalView getTargetExternalView(String resourceName) {
-    return _targetExternalViewMap.get(resourceName);
+    return _targetExternalViewMap == null ? null : _targetExternalViewMap.get(resourceName);
   }
 
   public void updateTargetExternalView(String resourceName, ExternalView targetExternalView) {
@@ -794,6 +803,9 @@ public class ClusterDataCache extends AbstractDataCache {
    * @return
    */
   public Map<String, ExternalView> getExternalViews() {
+    if (_externalViewMap == null) {
+      return Collections.emptyMap();
+    }
     return Collections.unmodifiableMap(_externalViewMap);
   }
 
@@ -802,6 +814,9 @@ public class ClusterDataCache extends AbstractDataCache {
    * @param externalViews
    */
   public void updateExternalViews(List<ExternalView> externalViews) {
+    if (_externalViewMap == null) {
+      _externalViewMap = new HashMap<>();
+    }
     for (ExternalView externalView : externalViews) {
       _externalViewMap.put(externalView.getResourceName(), externalView);
     }


[2/3] helix git commit: Fix unstable test disable partition

Posted by jx...@apache.org.
Fix unstable test disable partition


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0f0d4177
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0f0d4177
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0f0d4177

Branch: refs/heads/master
Commit: 0f0d4177e830f9f1f3a5f26abeae0b366e9b1863
Parents: 1e1cc41
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Oct 25 10:55:19 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 17:10:41 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/integration/manager/TestZkHelixAdmin.java     | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0f0d4177/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
index c2da24d..5141a8d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
@@ -60,6 +60,8 @@ public class TestZkHelixAdmin extends TaskTestBase {
   public void testEnableDisablePartitions() throws InterruptedException {
     _admin.enablePartition(false, CLUSTER_NAME, (PARTICIPANT_PREFIX + "_" + _startPort),
         WorkflowGenerator.DEFAULT_TGT_DB, Arrays.asList(new String[] { "TestDB_0", "TestDB_2" }));
+    _admin.enablePartition(false, CLUSTER_NAME, (PARTICIPANT_PREFIX + "_" + (_startPort + 1)),
+        WorkflowGenerator.DEFAULT_TGT_DB, Arrays.asList(new String[] { "TestDB_0", "TestDB_2" }));
 
     IdealState idealState =
         _admin.getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);