You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/09/25 18:20:41 UTC
[helix] branch master updated: Add initial callback when adding
routing table listener (#1154)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new ea6bca2 Add initial callback when adding routing table listener (#1154)
ea6bca2 is described below
commit ea6bca2f26093af0a8ec4d94f25692040537d7d7
Author: Meng Zhang <mn...@linkedin.com>
AuthorDate: Fri Sep 25 11:20:31 2020 -0700
Add initial callback when adding routing table listener (#1154)
Add initial callback when adding routing table listener.
---
.../helix/spectator/RoutingTableProvider.java | 33 ++++++++++++++++++----
.../spectator/TestRoutingTableProvider.java | 28 +++++++++++++++---
2 files changed, 52 insertions(+), 9 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 46f386d..37600ac 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -294,6 +294,10 @@ public class RoutingTableProvider
* @param sourceDataTypeMap
*/
private void validateSourceDataTypeMap(Map<PropertyType, List<String>> sourceDataTypeMap) {
+ if (sourceDataTypeMap == null) {
+ throw new IllegalArgumentException(
+ "The sourceDataTypeMap of Routing Table Provider should not be null");
+ }
for (PropertyType propertyType : sourceDataTypeMap.keySet()) {
if (propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
&& sourceDataTypeMap.get(propertyType).size() == 0) {
@@ -302,10 +306,10 @@ public class RoutingTableProvider
}
if (!propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
&& sourceDataTypeMap.get(propertyType).size() != 0) {
- logger.error("Type has been used in addition to the propertyType {} !",
- propertyType.name());
- throw new HelixException(
- String.format("Type %s has been used in addition to the propertyType %s !",
+ logger
+ .error("Type has been used in addition to the propertyType {} !", propertyType.name());
+ throw new HelixException(String
+ .format("Type %s has been used in addition to the propertyType %s !",
sourceDataTypeMap.get(propertyType), propertyType.name()));
}
}
@@ -403,6 +407,7 @@ public class RoutingTableProvider
return snapshots;
}
+
/**
* Add RoutingTableChangeListener with user defined context
* @param routingTableChangeListener
@@ -410,9 +415,27 @@ public class RoutingTableProvider
*/
public void addRoutingTableChangeListener(
final RoutingTableChangeListener routingTableChangeListener, Object context) {
+ addRoutingTableChangeListener(routingTableChangeListener, context, false);
+ }
+
+ /**
+ * Add RoutingTableChangeListener with user defined context
+ * @param routingTableChangeListener
+ * @param context user defined context
+ * @param isTriggerCallback whether to trigger the initial callback during adding listener
+ */
+ public void addRoutingTableChangeListener(
+ final RoutingTableChangeListener routingTableChangeListener, Object context,
+ boolean isTriggerCallback) {
_routingTableChangeListenerMap.put(routingTableChangeListener, new ListenerContext(context));
- logger.info("Attach RoutingTableProviderChangeListener {}",
+ logger.info("Attach RoutingTableProviderChangeListener {}.",
routingTableChangeListener.getClass().getName());
+ if (isTriggerCallback) {
+ logger.info("Force triggering a callback for the new listener in routing table provider");
+ final NotificationContext periodicRefreshContext = new NotificationContext(_helixManager);
+ periodicRefreshContext.setType(NotificationContext.Type.PERIODIC_REFRESH);
+ _routerUpdater.queueEvent(periodicRefreshContext, ClusterEventType.PeriodicalRebalance, null);
+ }
}
/**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
index 446da86..6f22b19 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
@@ -29,8 +29,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.helix.AccessOption;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
@@ -85,6 +85,8 @@ public class TestRoutingTableProvider extends ZkTestBase {
private static final AtomicBoolean customizedViewChangeCalled = new AtomicBoolean(false);
class MockRoutingTableChangeListener implements RoutingTableChangeListener {
+ boolean routingTableChangeReceived = false;
+
@Override
public void onRoutingTableChange(RoutingTableSnapshot routingTableSnapshot, Object context) {
Set<String> masterInstances = new HashSet<>();
@@ -102,6 +104,7 @@ public class TestRoutingTableProvider extends ZkTestBase {
} else {
_listenerTestResult = true;
}
+ routingTableChangeReceived = true;
}
}
@@ -149,10 +152,10 @@ public class TestRoutingTableProvider extends ZkTestBase {
_controller.syncStart();
// start speculator
- _routingTableProvider_default = new RoutingTableProvider();
_spectator = HelixManagerFactory
.getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, ZK_ADDR);
_spectator.connect();
+ _routingTableProvider_default = new RoutingTableProvider(_spectator);
_spectator.addExternalViewChangeListener(_routingTableProvider_default);
_spectator.addLiveInstanceChangeListener(_routingTableProvider_default);
_spectator.addInstanceConfigChangeListener(_routingTableProvider_default);
@@ -179,6 +182,22 @@ public class TestRoutingTableProvider extends ZkTestBase {
}
@Test
+ public void testInvocation() throws Exception {
+ MockRoutingTableChangeListener routingTableChangeListener = new MockRoutingTableChangeListener();
+ _routingTableProvider_default
+ .addRoutingTableChangeListener(routingTableChangeListener, null, true);
+
+ // Add a routing table provider listener should trigger an execution of the
+ // listener callbacks
+ Assert.assertTrue(TestHelper.verify(() -> {
+ if (!routingTableChangeListener.routingTableChangeReceived) {
+ return false;
+ }
+ return true;
+ }, TestHelper.WAIT_DURATION));
+ }
+
+ @Test(dependsOnMethods = { "testInvocation" })
public void testRoutingTable() {
Assert.assertEquals(_routingTableProvider_default.getLiveInstances().size(), _instances.size());
Assert.assertEquals(_routingTableProvider_default.getInstanceConfigs().size(), _instances.size());
@@ -222,9 +241,10 @@ public class TestRoutingTableProvider extends ZkTestBase {
Map<String, Set<String>> context = new HashMap<>();
context.put("MASTER", Sets.newSet(_instances.get(0)));
context.put("SLAVE", Sets.newSet(_instances.get(1), _instances.get(2)));
- _routingTableProvider_default.addRoutingTableChangeListener(routingTableChangeListener, context);
_routingTableProvider_default
- .addRoutingTableChangeListener(new MockRoutingTableChangeListener(), null);
+ .addRoutingTableChangeListener(routingTableChangeListener, context, true);
+ _routingTableProvider_default
+ .addRoutingTableChangeListener(new MockRoutingTableChangeListener(), null, true);
// reenable the master instance to cause change
String prevMasterInstance = _instances.get(0);
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);