You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/03/31 18:09:00 UTC
[helix] branch master updated: Refresh live instance while fetching
the current state information in the RoutingTableProvider. (#920)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 4355430 Refresh live instance while fetching the current state information in the RoutingTableProvider. (#920)
4355430 is described below
commit 43554304ab2d36ab344b555d3efccdd90b3b1f13
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Mar 31 11:08:52 2020 -0700
Refresh live instance while fetching the current state information in the RoutingTableProvider. (#920)
This change is a workaround to fix helix-919.
The additional read is to prevent the provider from reading the stale current states.
---
.../apache/helix/spectator/RoutingDataCache.java | 11 ++
.../TestRoutingTableProviderFromCurrentStates.java | 174 +++++++++++++++++++--
2 files changed, 169 insertions(+), 16 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index b5fdd72..7af0d28 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -79,6 +79,17 @@ class RoutingDataCache extends BasicClusterDataCache {
.get(HelixConstants.ChangeType.CURRENT_STATE)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, false);
+ /**
+ * Workaround of https://github.com/apache/helix/issues/919.
+ * Why it is workaround?
+ * 1. Before a larger scale refactoring, to minimize the impact on cache logic, this change
+ * introduces extra read to update the liveInstance list before processing current states.
+ * 2. This change does not handle the corresponding callback handlers, which should also be
+ * registered when a new liveInstance node is found.
+ * TODO: Refactor cache processing logic and also refine the callback handler registration
+ * TODO: logic.
+ **/
+ _liveInstancePropertyCache.refresh(accessor);
Map<String, LiveInstance> liveInstanceMap = getLiveInstances();
_currentStateCache.refresh(accessor, liveInstanceMap);
LOG.info("Reload CurrentStates. Takes " + (System.currentTimeMillis() - start) + " ms");
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
index 37c7a1f..ab88cea 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Semaphore;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
@@ -32,14 +33,18 @@ import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
@@ -77,8 +82,8 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
_participants[i].syncStart();
}
- _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
- InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
_manager.connect();
String controllerName = CONTROLLER_PREFIX + "_0";
@@ -133,7 +138,7 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
IdealState idealState1 =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db1);
- validate(idealState1, routingTableEV, routingTableCurrentStates);
+ validate(idealState1, routingTableEV, routingTableCurrentStates, 0);
// add new DB
String db2 = "TestDB-2";
@@ -149,7 +154,7 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
IdealState idealState2 =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db2);
- validate(idealState2, routingTableEV, routingTableCurrentStates);
+ validate(idealState2, routingTableEV, routingTableCurrentStates, 0);
// shutdown an instance
startTime = System.currentTimeMillis();
@@ -159,8 +164,8 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
validatePropagationLatency(PropertyType.CURRENTSTATES,
System.currentTimeMillis() - startTime);
- validate(idealState1, routingTableEV, routingTableCurrentStates);
- validate(idealState2, routingTableEV, routingTableCurrentStates);
+ validate(idealState1, routingTableEV, routingTableCurrentStates, 0);
+ validate(idealState2, routingTableEV, routingTableCurrentStates, 0);
} finally {
routingTableEV.shutdown();
routingTableCurrentStates.shutdown();
@@ -182,9 +187,94 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
}, 1000));
}
- @Test(dependsOnMethods = {
- "testRoutingTableWithCurrentStates"
- })
+ @Test(dependsOnMethods = "testRoutingTableWithCurrentStates")
+ public void TestInconsistentStateEventProcessing() throws Exception {
+ // This test requires an additional HelixManager since one of the provider event processing will
+ // be blocked.
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, TestHelper.getTestMethodName(), InstanceType.SPECTATOR,
+ ZK_ADDR);
+ helixManager.connect();
+ RoutingTableProvider routingTableEV = null;
+ BlockingCurrentStateRoutingTableProvider routingTableCS = null;
+ int shutdownParticipantIndex = -1;
+
+ try {
+ // Prepare test environment
+ routingTableEV = new RoutingTableProvider(helixManager, PropertyType.EXTERNALVIEW);
+ routingTableCS = new BlockingCurrentStateRoutingTableProvider(_manager);
+ // Ensure the current state routing table provider has been refreshed.
+ for (String resourceName : _gSetupTool.getClusterManagementTool()
+ .getResourcesInCluster(CLUSTER_NAME)) {
+ IdealState idealState = _gSetupTool.getClusterManagementTool()
+ .getResourceIdealState(CLUSTER_NAME, resourceName);
+ validate(idealState, routingTableEV, routingTableCS, 3000);
+ }
+
+ // Blocking the current state event processing
+ routingTableCS.setBlocking(true);
+ // 1. Create a new resource and wait until all components process the change.
+ // Note that since the processing of routingTableCS has been blocked, it will be stuck on the
+ // current state change.
+ String db = "TestDB-" + TestHelper.getTestMethodName();
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS, "MasterSlave",
+ IdealState.RebalanceMode.FULL_AUTO.name());
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS);
+ ZkHelixClusterVerifier clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ Assert.assertTrue(clusterVerifier.verifyByPolling(5000, 500));
+ // 2. Process one event, so the current state will be refreshed with the new DB partitions
+ routingTableCS.proceedNewEventHandling();
+ IdealState idealState =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ String targetPartitionName = idealState.getPartitionSet().iterator().next();
+ // Wait until the routingtable is updated.
+ BlockingCurrentStateRoutingTableProvider finalRoutingTableCS = routingTableCS;
+ Assert.assertTrue(TestHelper.verify(
+ () -> finalRoutingTableCS.getInstances(db, targetPartitionName, "MASTER").size() > 0,
+ 2000));
+ String targetNodeName =
+ routingTableCS.getInstances(db, targetPartitionName, "MASTER").get(0).getInstanceName();
+ // 3. Shutdown one of the instance that contains a master partition
+ for (int i = 0; i < _participants.length; i++) {
+ if (_participants[i].getInstanceName().equals(targetNodeName)) {
+ shutdownParticipantIndex = i;
+ _participants[i].syncStop();
+ }
+ }
+ Assert.assertTrue(clusterVerifier.verifyByPolling());
+ // 4. Process one of the stale current state event.
+ // The expectation is that, the provider will refresh with all the latest data including the
+ // the live instance change. Even the corresponding ZK event has not been processed yet.
+ routingTableCS.proceedNewEventHandling();
+ validate(idealState, routingTableEV, routingTableCS, 3000);
+ // 5. Unblock the event processing and let the provider to process all events.
+ // The expectation is that, the eventual routing tables are still the same.
+ routingTableCS.setBlocking(false);
+ routingTableCS.proceedNewEventHandling();
+ // Wait for a short while so the router will process at least one event.
+ Thread.sleep(500);
+ // Confirm that 2 providers match eventually
+ validate(idealState, routingTableEV, routingTableCS, 2000);
+ } finally {
+ if (routingTableCS != null) {
+ routingTableCS.setBlocking(false);
+ routingTableCS.proceedNewEventHandling();
+ routingTableCS.shutdown();
+ }
+ if (routingTableEV != null) {
+ routingTableEV.shutdown();
+ }
+ if (shutdownParticipantIndex >= 0) {
+ String participantName = _participants[shutdownParticipantIndex].getInstanceName();
+ _participants[shutdownParticipantIndex] =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName);
+ _participants[shutdownParticipantIndex].syncStart();
+ }
+ }
+ }
+
+ @Test(dependsOnMethods = { "TestInconsistentStateEventProcessing" })
public void testWithSupportSourceDataType() {
new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW).shutdown();
new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW).shutdown();
@@ -199,6 +289,12 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
}
private void validate(IdealState idealState, RoutingTableProvider routingTableEV,
+ RoutingTableProvider routingTableCurrentStates, int timeout) throws Exception {
+ Assert.assertTrue(TestHelper
+ .verify(() -> compare(idealState, routingTableEV, routingTableCurrentStates), timeout));
+ }
+
+ private boolean compare(IdealState idealState, RoutingTableProvider routingTableEV,
RoutingTableProvider routingTableCurrentStates) {
String db = idealState.getResourceName();
Set<String> partitions = idealState.getPartitionSet();
@@ -207,17 +303,63 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
routingTableEV.getInstancesForResource(db, partition, "MASTER");
List<InstanceConfig> masterInsCs =
routingTableCurrentStates.getInstancesForResource(db, partition, "MASTER");
- Assert.assertEquals(masterInsEv.size(), 1);
- Assert.assertEquals(masterInsCs.size(), 1);
- Assert.assertEquals(masterInsCs, masterInsEv);
-
+ if (masterInsEv.size() != 1 || masterInsCs.size() != 1 || !masterInsCs.equals(masterInsEv)) {
+ return false;
+ }
List<InstanceConfig> slaveInsEv =
routingTableEV.getInstancesForResource(db, partition, "SLAVE");
List<InstanceConfig> slaveInsCs =
routingTableCurrentStates.getInstancesForResource(db, partition, "SLAVE");
- Assert.assertEquals(slaveInsEv.size(), 2);
- Assert.assertEquals(slaveInsCs.size(), 2);
- Assert.assertEquals(new HashSet(slaveInsCs), new HashSet(slaveInsEv));
+ if (slaveInsEv.size() != 2 || slaveInsCs.size() != 2 || !new HashSet(slaveInsCs)
+ .equals(new HashSet(slaveInsEv))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ static class BlockingCurrentStateRoutingTableProvider extends RoutingTableProvider {
+ private final Semaphore newEventHandlingCount = new Semaphore(0);
+ private boolean _isBlocking = false;
+
+ public BlockingCurrentStateRoutingTableProvider(HelixManager manager) {
+ super(manager, PropertyType.CURRENTSTATES);
+ }
+
+ public void setBlocking(boolean isBlocking) {
+ _isBlocking = isBlocking;
+ }
+
+ void proceedNewEventHandling() {
+ newEventHandlingCount.release();
+ }
+
+ @Override
+ @PreFetch(enabled = false)
+ public void onStateChange(String instanceName, List<CurrentState> statesInfo,
+ NotificationContext changeContext) {
+ if (_isBlocking) {
+ try {
+ newEventHandlingCount.acquire();
+ } catch (InterruptedException e) {
+ throw new HelixException("Failed to acquire handling lock for testing.");
+ }
+ }
+ super.onStateChange(instanceName, statesInfo, changeContext);
+ }
+
+ @Override
+ @PreFetch(enabled = true)
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+ NotificationContext changeContext) {
+ if (_isBlocking) {
+ try {
+ newEventHandlingCount.acquire();
+ } catch (InterruptedException e) {
+ throw new HelixException("Failed to acquire handling lock for testing.");
+ }
+ }
+ super.onLiveInstanceChange(liveInstances, changeContext);
}
}
}