You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/09/25 18:20:41 UTC

[helix] branch master updated: Add initial callback when adding routing table listener (#1154)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new ea6bca2  Add initial callback when adding routing table listener (#1154)
ea6bca2 is described below

commit ea6bca2f26093af0a8ec4d94f25692040537d7d7
Author: Meng Zhang <mn...@linkedin.com>
AuthorDate: Fri Sep 25 11:20:31 2020 -0700

    Add initial callback when adding routing table listener (#1154)
    
    Add initial callback when adding routing table listener.
---
 .../helix/spectator/RoutingTableProvider.java      | 33 ++++++++++++++++++----
 .../spectator/TestRoutingTableProvider.java        | 28 +++++++++++++++---
 2 files changed, 52 insertions(+), 9 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 46f386d..37600ac 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -294,6 +294,10 @@ public class RoutingTableProvider
    * @param sourceDataTypeMap
    */
   private void validateSourceDataTypeMap(Map<PropertyType, List<String>> sourceDataTypeMap) {
+    if (sourceDataTypeMap == null) {
+      throw new IllegalArgumentException(
+          "The sourceDataTypeMap of Routing Table Provider should not be null");
+    }
     for (PropertyType propertyType : sourceDataTypeMap.keySet()) {
       if (propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
           && sourceDataTypeMap.get(propertyType).size() == 0) {
@@ -302,10 +306,10 @@ public class RoutingTableProvider
       }
       if (!propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
           && sourceDataTypeMap.get(propertyType).size() != 0) {
-        logger.error("Type has been used in addition to the propertyType {} !",
-            propertyType.name());
-        throw new HelixException(
-            String.format("Type %s has been used in addition to the propertyType %s !",
+        logger
+            .error("Type has been used in addition to the propertyType {} !", propertyType.name());
+        throw new HelixException(String
+            .format("Type %s has been used in addition to the propertyType %s !",
                 sourceDataTypeMap.get(propertyType), propertyType.name()));
       }
     }
@@ -403,6 +407,7 @@ public class RoutingTableProvider
     return snapshots;
   }
 
+
   /**
    * Add RoutingTableChangeListener with user defined context
    * @param routingTableChangeListener
@@ -410,9 +415,27 @@ public class RoutingTableProvider
    */
   public void addRoutingTableChangeListener(
       final RoutingTableChangeListener routingTableChangeListener, Object context) {
+    addRoutingTableChangeListener(routingTableChangeListener, context, false);
+  }
+
+  /**
+   * Add RoutingTableChangeListener with user defined context
+   * @param routingTableChangeListener
+   * @param context user defined context
+   * @param isTriggerCallback whether to trigger the initial callback during adding listener
+   */
+  public void addRoutingTableChangeListener(
+      final RoutingTableChangeListener routingTableChangeListener, Object context,
+      boolean isTriggerCallback) {
     _routingTableChangeListenerMap.put(routingTableChangeListener, new ListenerContext(context));
-    logger.info("Attach RoutingTableProviderChangeListener {}",
+    logger.info("Attach RoutingTableProviderChangeListener {}.",
         routingTableChangeListener.getClass().getName());
+    if (isTriggerCallback) {
+      logger.info("Force triggering a callback for the new listener in routing table provider");
+      final NotificationContext periodicRefreshContext = new NotificationContext(_helixManager);
+      periodicRefreshContext.setType(NotificationContext.Type.PERIODIC_REFRESH);
+      _routerUpdater.queueEvent(periodicRefreshContext, ClusterEventType.PeriodicalRebalance, null);
+      }
   }
 
   /**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
index 446da86..6f22b19 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
@@ -29,8 +29,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -85,6 +85,8 @@ public class TestRoutingTableProvider extends ZkTestBase {
   private static final AtomicBoolean customizedViewChangeCalled = new AtomicBoolean(false);
 
   class MockRoutingTableChangeListener implements RoutingTableChangeListener {
+    boolean routingTableChangeReceived = false;
+
     @Override
     public void onRoutingTableChange(RoutingTableSnapshot routingTableSnapshot, Object context) {
       Set<String> masterInstances = new HashSet<>();
@@ -102,6 +104,7 @@ public class TestRoutingTableProvider extends ZkTestBase {
       } else {
         _listenerTestResult = true;
       }
+      routingTableChangeReceived = true;
     }
   }
 
@@ -149,10 +152,10 @@ public class TestRoutingTableProvider extends ZkTestBase {
     _controller.syncStart();
 
     // start speculator
-    _routingTableProvider_default = new RoutingTableProvider();
     _spectator = HelixManagerFactory
         .getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, ZK_ADDR);
     _spectator.connect();
+    _routingTableProvider_default = new RoutingTableProvider(_spectator);
     _spectator.addExternalViewChangeListener(_routingTableProvider_default);
     _spectator.addLiveInstanceChangeListener(_routingTableProvider_default);
     _spectator.addInstanceConfigChangeListener(_routingTableProvider_default);
@@ -179,6 +182,22 @@ public class TestRoutingTableProvider extends ZkTestBase {
   }
 
   @Test
+  public void testInvocation() throws Exception {
+    MockRoutingTableChangeListener routingTableChangeListener = new MockRoutingTableChangeListener();
+    _routingTableProvider_default
+        .addRoutingTableChangeListener(routingTableChangeListener, null, true);
+
+    // Add a routing table provider listener should trigger an execution of the
+    // listener callbacks
+    Assert.assertTrue(TestHelper.verify(() -> {
+      if (!routingTableChangeListener.routingTableChangeReceived) {
+        return false;
+      }
+      return true;
+    }, TestHelper.WAIT_DURATION));
+  }
+
+  @Test(dependsOnMethods = { "testInvocation" })
   public void testRoutingTable() {
     Assert.assertEquals(_routingTableProvider_default.getLiveInstances().size(), _instances.size());
     Assert.assertEquals(_routingTableProvider_default.getInstanceConfigs().size(), _instances.size());
@@ -222,9 +241,10 @@ public class TestRoutingTableProvider extends ZkTestBase {
     Map<String, Set<String>> context = new HashMap<>();
     context.put("MASTER", Sets.newSet(_instances.get(0)));
     context.put("SLAVE", Sets.newSet(_instances.get(1), _instances.get(2)));
-    _routingTableProvider_default.addRoutingTableChangeListener(routingTableChangeListener, context);
     _routingTableProvider_default
-        .addRoutingTableChangeListener(new MockRoutingTableChangeListener(), null);
+        .addRoutingTableChangeListener(routingTableChangeListener, context, true);
+    _routingTableProvider_default
+        .addRoutingTableChangeListener(new MockRoutingTableChangeListener(), null, true);
     // reenable the master instance to cause change
     String prevMasterInstance = _instances.get(0);
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);