You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/10/28 01:53:11 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #5457: [transaction-coordinator] Ownership change listeners

codelipenghui commented on a change in pull request #5457: [transaction-coordinator] Ownership change listeners
URL: https://github.com/apache/pulsar/pull/5457#discussion_r339381328
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 ##########
 @@ -858,6 +867,89 @@ public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData
         bundleFactory.invalidateBundleCache(nsName);
     }
 
+    protected void onNamespaceBundleOwned(NamespaceBundle bundle) {
+        for (NamespaceBundleOwnershipListener bundleOwnedListener : bundleOwnershipListeners) {
+            try {
+                if (bundleOwnedListener.getFilter().test(bundle)) {
+                    bundleOwnedListener.onLoad(bundle);
+                }
+            } catch (Throwable t) {
+                LOG.error("Call bundle {} ownership lister error", bundle, t);
+            }
+        }
+        if (!CollectionUtils.isEmpty(topicOwnershipListeners)) {
+            getOwnedTopicListForNamespaceBundle(bundle).whenComplete((topics, ex) -> {
+                if (ex == null) {
+                    for (String topic : topics) {
+                        TopicName topicName = TopicName.get(topic);
+                        for (TopicOwnershipListener topicOwnershipListener : topicOwnershipListeners) {
+                            if (topicOwnershipListener.getFilter().test(topicName)) {
+                                try {
+                                    topicOwnershipListener.onLoad(topicName);
+                                } catch (Throwable t) {
+                                    LOG.error("Call topic {} ownership lister error", topic, t);
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    LOG.error("Get owned topic list for namespace bundle {} error.", bundle, ex);
+                }
+            });
+        }
+    }
+
+    protected void onNamespaceBundleUnload(NamespaceBundle bundle) {
+        for (NamespaceBundleOwnershipListener bundleOwnedListener : bundleOwnershipListeners) {
+            try {
+                if (bundleOwnedListener.getFilter().test(bundle)) {
+                    bundleOwnedListener.unLoad(bundle);
+                }
+            } catch (Throwable t) {
+                LOG.error("Call bundle {} ownership lister error", bundle, t);
+            }
+        }
+        if (!CollectionUtils.isEmpty(topicOwnershipListeners)) {
+            getOwnedTopicListForNamespaceBundle(bundle).whenComplete((topics, ex) -> {
+                if (ex == null) {
+                    for (String topic : topics) {
+                        TopicName topicName = TopicName.get(topic);
+                        for (TopicOwnershipListener topicOwnershipListener : topicOwnershipListeners) {
+                            if (topicOwnershipListener.getFilter().test(topicName)) {
+                                try {
+                                    topicOwnershipListener.unLoad(topicName);
+                                } catch (Throwable t) {
+                                    LOG.error("Call topic {} ownership lister error", topic, t);
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    LOG.error("Get owned topic list for namespace bundle {} error.", bundle, ex);
+                }
+            });
+        }
+    }
+
+    public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener... listeners) {
+        checkNotNull(listeners);
+        for (NamespaceBundleOwnershipListener listener : listeners) {
+            if (listener != null) {
+                bundleOwnershipListeners.add(listener);
+            }
+        }
+        getOwnedServiceUnits().forEach(this::onNamespaceBundleOwned);
+    }
+
+    public void addTopicOwnershipListener(TopicOwnershipListener... listeners) {
+        checkNotNull(listeners);
+        for (TopicOwnershipListener listener : listeners) {
+            if (listener != null) {
+                topicOwnershipListeners.add(listener);
+            }
+        }
 
 Review comment:
   yes

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services