You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/10 22:05:05 UTC

[GitHub] [pulsar] merlimat opened a new pull request #10532: PIP-45: Migrate NamespaceService to use MetadataStore

merlimat opened a new pull request #10532:
URL: https://github.com/apache/pulsar/pull/10532


   ### Motivation
   
   Converted the NamespaceService to use MetadataStore instead of direct ZK access.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10532: PIP-45: Migrate NamespaceService to use MetadataStore

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10532:
URL: https://github.com/apache/pulsar/pull/10532#discussion_r630565187



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -916,44 +885,23 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
      *
      * @param nsname
      * @param nsBundles
-     * @param callback
      * @throws Exception
      */
-    private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles, StatCallback callback)
-            throws Exception {
+    private CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) {
         checkNotNull(nsname);
         checkNotNull(nsBundles);
         String path = joinPath(LOCAL_POLICIES_ROOT, nsname.toString());
-        Optional<LocalPolicies> policies = pulsar.getLocalZkCacheService().policiesCache().get(path);
 
-        if (!policies.isPresent()) {
-            // if policies is not present into localZk then create new policies
-            policies = this.pulsar.getLocalZkCacheService().createPolicies(path, false)
-                    .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+        LocalPolicies localPolicies = nsBundles.toLocalPolicies();
+        byte[] data;
+        try {
+            data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(localPolicies);
+        } catch (Exception e) {

Review comment:
       Yes, fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat merged pull request #10532: PIP-45: Migrate NamespaceService to use MetadataStore

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #10532:
URL: https://github.com/apache/pulsar/pull/10532


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10532: PIP-45: Migrate NamespaceService to use MetadataStore

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10532:
URL: https://github.com/apache/pulsar/pull/10532#discussion_r630300896



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -687,18 +685,7 @@ private boolean isBrokerActive(String candidateBroker) throws KeeperException, I
     }
 
     public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle) {
-        String bundlePath = ServiceUnitZkUtils.path(bundle);
-        CompletableFuture<Boolean> isExistFuture = new CompletableFuture<Boolean>();
-        pulsar.getLocalZkCache().getZooKeeper().exists(bundlePath, false, (rc, path, ctx, stat) -> {
-            if (rc == Code.OK.intValue()) {
-                isExistFuture.complete(true);
-            } else if (rc == Code.NONODE.intValue()) {
-                isExistFuture.complete(false);
-            } else {
-                isExistFuture.completeExceptionally(KeeperException.create(rc));
-            }
-        }, null);
-        return isExistFuture;
+        return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));

Review comment:
       it is amazing to see how much this code is simpler !

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -916,44 +885,23 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
      *
      * @param nsname
      * @param nsBundles
-     * @param callback
      * @throws Exception
      */
-    private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles, StatCallback callback)
-            throws Exception {
+    private CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) {
         checkNotNull(nsname);
         checkNotNull(nsBundles);
         String path = joinPath(LOCAL_POLICIES_ROOT, nsname.toString());
-        Optional<LocalPolicies> policies = pulsar.getLocalZkCacheService().policiesCache().get(path);
 
-        if (!policies.isPresent()) {
-            // if policies is not present into localZk then create new policies
-            policies = this.pulsar.getLocalZkCacheService().createPolicies(path, false)
-                    .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+        LocalPolicies localPolicies = nsBundles.toLocalPolicies();
+        byte[] data;
+        try {
+            data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(localPolicies);
+        } catch (Exception e) {

Review comment:
       is this IOException ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org