You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/03/31 18:09:00 UTC

[helix] branch master updated: Refresh live instance while fetching the current state information in the RoutingTableProvider. (#920)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4355430  Refresh live instance while fetching the current state information in the RoutingTableProvider. (#920)
4355430 is described below

commit 43554304ab2d36ab344b555d3efccdd90b3b1f13
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Mar 31 11:08:52 2020 -0700

    Refresh live instance while fetching the current state information in the RoutingTableProvider. (#920)
    
    This change is a workaround to fix helix-919.
    The additional read is to prevent the provider from reading the stale current states.
---
 .../apache/helix/spectator/RoutingDataCache.java   |  11 ++
 .../TestRoutingTableProviderFromCurrentStates.java | 174 +++++++++++++++++++--
 2 files changed, 169 insertions(+), 16 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index b5fdd72..7af0d28 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -79,6 +79,17 @@ class RoutingDataCache extends BasicClusterDataCache {
         .get(HelixConstants.ChangeType.CURRENT_STATE)) {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, false);
+      /**
+       * Workaround of https://github.com/apache/helix/issues/919.
+       * Why it is workaround?
+       * 1. Before a larger scale refactoring, to minimize the impact on cache logic, this change
+       * introduces extra read to update the liveInstance list before processing current states.
+       * 2. This change does not handle the corresponding callback handlers, which should also be
+       * registered when a new liveInstance node is found.
+       * TODO: Refactor cache processing logic and also refine the callback handler registration
+       * TODO: logic.
+       **/
+      _liveInstancePropertyCache.refresh(accessor);
       Map<String, LiveInstance> liveInstanceMap = getLiveInstances();
       _currentStateCache.refresh(accessor, liveInstanceMap);
       LOG.info("Reload CurrentStates. Takes " + (System.currentTimeMillis() - start) + " ms");
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
index 37c7a1f..ab88cea 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Semaphore;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
@@ -32,14 +33,18 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
@@ -77,8 +82,8 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
       _participants[i].syncStart();
     }
 
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
-        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
     _manager.connect();
 
     String controllerName = CONTROLLER_PREFIX + "_0";
@@ -133,7 +138,7 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
 
       IdealState idealState1 =
           _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db1);
-      validate(idealState1, routingTableEV, routingTableCurrentStates);
+      validate(idealState1, routingTableEV, routingTableCurrentStates, 0);
 
       // add new DB
       String db2 = "TestDB-2";
@@ -149,7 +154,7 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
 
       IdealState idealState2 =
           _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db2);
-      validate(idealState2, routingTableEV, routingTableCurrentStates);
+      validate(idealState2, routingTableEV, routingTableCurrentStates, 0);
 
       // shutdown an instance
       startTime = System.currentTimeMillis();
@@ -159,8 +164,8 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
       validatePropagationLatency(PropertyType.CURRENTSTATES,
           System.currentTimeMillis() - startTime);
 
-      validate(idealState1, routingTableEV, routingTableCurrentStates);
-      validate(idealState2, routingTableEV, routingTableCurrentStates);
+      validate(idealState1, routingTableEV, routingTableCurrentStates, 0);
+      validate(idealState2, routingTableEV, routingTableCurrentStates, 0);
     } finally {
       routingTableEV.shutdown();
       routingTableCurrentStates.shutdown();
@@ -182,9 +187,94 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
     }, 1000));
   }
 
-  @Test(dependsOnMethods = {
-      "testRoutingTableWithCurrentStates"
-  })
+  @Test(dependsOnMethods = "testRoutingTableWithCurrentStates")
+  public void TestInconsistentStateEventProcessing() throws Exception {
+    // This test requires an additional HelixManager since one of the provider event processing will
+    // be blocked.
+    HelixManager helixManager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, TestHelper.getTestMethodName(), InstanceType.SPECTATOR,
+            ZK_ADDR);
+    helixManager.connect();
+    RoutingTableProvider routingTableEV = null;
+    BlockingCurrentStateRoutingTableProvider routingTableCS = null;
+    int shutdownParticipantIndex = -1;
+
+    try {
+      // Prepare test environment
+      routingTableEV = new RoutingTableProvider(helixManager, PropertyType.EXTERNALVIEW);
+      routingTableCS = new BlockingCurrentStateRoutingTableProvider(_manager);
+      // Ensure the current state routing table provider has been refreshed.
+      for (String resourceName : _gSetupTool.getClusterManagementTool()
+          .getResourcesInCluster(CLUSTER_NAME)) {
+        IdealState idealState = _gSetupTool.getClusterManagementTool()
+            .getResourceIdealState(CLUSTER_NAME, resourceName);
+        validate(idealState, routingTableEV, routingTableCS, 3000);
+      }
+
+      // Blocking the current state event processing
+      routingTableCS.setBlocking(true);
+      // 1. Create a new resource and wait until all components process the change.
+      // Note that since the processing of routingTableCS has been blocked, it will be stuck on the
+      // current state change.
+      String db = "TestDB-" + TestHelper.getTestMethodName();
+      _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS, "MasterSlave",
+          IdealState.RebalanceMode.FULL_AUTO.name());
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS);
+      ZkHelixClusterVerifier clusterVerifier =
+          new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+      Assert.assertTrue(clusterVerifier.verifyByPolling(5000, 500));
+      // 2. Process one event, so the current state will be refreshed with the new DB partitions
+      routingTableCS.proceedNewEventHandling();
+      IdealState idealState =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      String targetPartitionName = idealState.getPartitionSet().iterator().next();
+      // Wait until the routingtable is updated.
+      BlockingCurrentStateRoutingTableProvider finalRoutingTableCS = routingTableCS;
+      Assert.assertTrue(TestHelper.verify(
+          () -> finalRoutingTableCS.getInstances(db, targetPartitionName, "MASTER").size() > 0,
+          2000));
+      String targetNodeName =
+          routingTableCS.getInstances(db, targetPartitionName, "MASTER").get(0).getInstanceName();
+      // 3. Shutdown one of the instance that contains a master partition
+      for (int i = 0; i < _participants.length; i++) {
+        if (_participants[i].getInstanceName().equals(targetNodeName)) {
+          shutdownParticipantIndex = i;
+          _participants[i].syncStop();
+        }
+      }
+      Assert.assertTrue(clusterVerifier.verifyByPolling());
+      // 4. Process one of the stale current state event.
+      // The expectation is that, the provider will refresh with all the latest data including the
+      // the live instance change. Even the corresponding ZK event has not been processed yet.
+      routingTableCS.proceedNewEventHandling();
+      validate(idealState, routingTableEV, routingTableCS, 3000);
+      // 5. Unblock the event processing and let the provider to process all events.
+      // The expectation is that, the eventual routing tables are still the same.
+      routingTableCS.setBlocking(false);
+      routingTableCS.proceedNewEventHandling();
+      // Wait for a short while so the router will process at least one event.
+      Thread.sleep(500);
+      // Confirm that 2 providers match eventually
+      validate(idealState, routingTableEV, routingTableCS, 2000);
+    } finally {
+      if (routingTableCS != null) {
+        routingTableCS.setBlocking(false);
+        routingTableCS.proceedNewEventHandling();
+        routingTableCS.shutdown();
+      }
+      if (routingTableEV != null) {
+        routingTableEV.shutdown();
+      }
+      if (shutdownParticipantIndex >= 0) {
+        String participantName = _participants[shutdownParticipantIndex].getInstanceName();
+        _participants[shutdownParticipantIndex] =
+            new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName);
+        _participants[shutdownParticipantIndex].syncStart();
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = { "TestInconsistentStateEventProcessing" })
   public void testWithSupportSourceDataType() {
     new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW).shutdown();
     new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW).shutdown();
@@ -199,6 +289,12 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
   }
 
   private void validate(IdealState idealState, RoutingTableProvider routingTableEV,
+      RoutingTableProvider routingTableCurrentStates, int timeout) throws Exception {
+    Assert.assertTrue(TestHelper
+        .verify(() -> compare(idealState, routingTableEV, routingTableCurrentStates), timeout));
+  }
+
+  private boolean compare(IdealState idealState, RoutingTableProvider routingTableEV,
       RoutingTableProvider routingTableCurrentStates) {
     String db = idealState.getResourceName();
     Set<String> partitions = idealState.getPartitionSet();
@@ -207,17 +303,63 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
           routingTableEV.getInstancesForResource(db, partition, "MASTER");
       List<InstanceConfig> masterInsCs =
           routingTableCurrentStates.getInstancesForResource(db, partition, "MASTER");
-      Assert.assertEquals(masterInsEv.size(), 1);
-      Assert.assertEquals(masterInsCs.size(), 1);
-      Assert.assertEquals(masterInsCs, masterInsEv);
-
+      if (masterInsEv.size() != 1 || masterInsCs.size() != 1 || !masterInsCs.equals(masterInsEv)) {
+        return false;
+      }
       List<InstanceConfig> slaveInsEv =
           routingTableEV.getInstancesForResource(db, partition, "SLAVE");
       List<InstanceConfig> slaveInsCs =
           routingTableCurrentStates.getInstancesForResource(db, partition, "SLAVE");
-      Assert.assertEquals(slaveInsEv.size(), 2);
-      Assert.assertEquals(slaveInsCs.size(), 2);
-      Assert.assertEquals(new HashSet(slaveInsCs), new HashSet(slaveInsEv));
+      if (slaveInsEv.size() != 2 || slaveInsCs.size() != 2 || !new HashSet(slaveInsCs)
+          .equals(new HashSet(slaveInsEv))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  static class BlockingCurrentStateRoutingTableProvider extends RoutingTableProvider {
+    private final Semaphore newEventHandlingCount = new Semaphore(0);
+    private boolean _isBlocking = false;
+
+    public BlockingCurrentStateRoutingTableProvider(HelixManager manager) {
+      super(manager, PropertyType.CURRENTSTATES);
+    }
+
+    public void setBlocking(boolean isBlocking) {
+      _isBlocking = isBlocking;
+    }
+
+    void proceedNewEventHandling() {
+      newEventHandlingCount.release();
+    }
+
+    @Override
+    @PreFetch(enabled = false)
+    public void onStateChange(String instanceName, List<CurrentState> statesInfo,
+        NotificationContext changeContext) {
+      if (_isBlocking) {
+        try {
+          newEventHandlingCount.acquire();
+        } catch (InterruptedException e) {
+          throw new HelixException("Failed to acquire handling lock for testing.");
+        }
+      }
+      super.onStateChange(instanceName, statesInfo, changeContext);
+    }
+
+    @Override
+    @PreFetch(enabled = true)
+    public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+        NotificationContext changeContext) {
+      if (_isBlocking) {
+        try {
+          newEventHandlingCount.acquire();
+        } catch (InterruptedException e) {
+          throw new HelixException("Failed to acquire handling lock for testing.");
+        }
+      }
+      super.onLiveInstanceChange(liveInstances, changeContext);
     }
   }
 }