You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/07/16 02:09:46 UTC

[GitHub] [pulsar] liangyepianzhou opened a new pull request #11333: Add TransactionEnable in Namespace

liangyepianzhou opened a new pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333


   Add TransactionEnable in Namespace. There are some problems in namespaceTest and it is being debugged. Submit after debugging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou closed pull request #11333: Add TransactionEnable in Namespace

Posted by GitBox <gi...@apache.org>.
liangyepianzhou closed pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #11333: Add TransactionEnable in Namespace

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333#issuecomment-882316459


   @sijie Oh, good point. One transaction might acrosses multiple namespaces. If parts of the namespaces are disabled transactions, this will screw up the transaction semantic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #11333: Add TransactionEnable in Namespace

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333#discussion_r671075036



##########
File path: conf/standalone.conf
##########
@@ -590,7 +590,7 @@ managedLedgerDefaultAckQuorum=1
 
 # How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
 # Default is 60 seconds
-managedLedgerCursorPositionFlushSeconds = 60
+managedLedgerCursorPositionFlushSeconds=60

Review comment:
       this change looks unrelated, can you please revert?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -1853,6 +1853,32 @@ public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
         validateNamespaceName(tenant, namespace);
         internalSetNamespaceResourceGroup(null);
     }
+    @POST
+    @Path("/{tenant}/{namespace}/transactionEnabled")
+    @ApiOperation(value = "Update boolean of whether allow transaction of  namespace")

Review comment:
       "Enable or disable transactions on the given namespace"

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -1853,6 +1853,32 @@ public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
         validateNamespaceName(tenant, namespace);
         internalSetNamespaceResourceGroup(null);
     }
+    @POST
+    @Path("/{tenant}/{namespace}/transactionEnabled")
+    @ApiOperation(value = "Update boolean of whether allow transaction of  namespace")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace doesn't exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setTransactionEnable(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "boolean of whether allow transaction of  namespace", required = true)
+                    boolean transactionEnable) {
+        validateNamespaceName(tenant, namespace);
+        internalSetTransactionEnabled(transactionEnable);
+    }
+    @GET
+    @Path("/{tenant}/{namespace}/transactionEnabled")
+    @ApiOperation(value = "The boolean of whether allow transaction of  namespace")

Review comment:
       "Return information about the activation of transactions on the given namespace"

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -140,11 +143,21 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
         this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
+        TopicName topicName1 = TopicName.get(getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                     .get(AdminResource.path(POLICIES, topicName1.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       we must throw an error here, because we cannot proceed if we do not have this piece of information

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
##########
@@ -125,6 +125,8 @@
     @SuppressWarnings("checkstyle:MemberName")
     public String resource_group_name = null;
 
+    @SuppressWarnings("checkstyle:MemberName")
+    public boolean transaction_enable = false;

Review comment:
       This should be "true" by default, otherwise when you upgrade from Pulsar 2.8.0 to Pulsar 2.9.0 you will see Transactions disabled for every namespace.
   

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
##########
@@ -3616,6 +3616,66 @@ public void clearProperties(String namespace) throws PulsarAdminException {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void setTransactionEnable(String namespace, boolean transactionEnable) throws PulsarAdminException {
+        try {
+            setTransactionEnableAsync(namespace, transactionEnable)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setTransactionEnableAsync(String namespace, boolean transactionEnable) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "transactionEnabled");
+        return asyncPostRequest(path, Entity.entity(transactionEnable, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public boolean getTransactionEnabled(String namespace) throws PulsarAdminException {
+        try {
+            return getTransactionEnabledAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> getTransactionEnabledAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "transactionEnabled");
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        asyncGetRequest(path,

Review comment:
       who does this work during an upgrade?
   if I make the request to a broker that is still at 2.8.0 version ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #11333: Add TransactionEnable in Namespace

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333#discussion_r671029110



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
##########
@@ -3616,6 +3616,66 @@ public void clearProperties(String namespace) throws PulsarAdminException {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void setTransactionEnable(String namespace, boolean transactionEnable) throws PulsarAdminException {
+        try {
+            setTransactionEnableAsync(namespace, transactionEnable)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setTransactionEnableAsync(String namespace, boolean transactionEnable) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "transactionEnabled");
+        return asyncPostRequest(path, Entity.entity(transactionEnable, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public boolean getTransactionEnabled(String namespace) throws PulsarAdminException {
+        try {
+            return getTransactionEnabledAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> getTransactionEnabledAsync(String namespace) {

Review comment:
       Excuse me, I didn't understand what you mean




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #11333: Add TransactionEnable in Namespace

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333#discussion_r671028181



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -343,7 +351,16 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
         this.transactionCompletableFuture = new CompletableFuture<>();
-        if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
+        TopicName topicName = TopicName.get(topic);
+        Optional<Policies> policies = null;
+        try {
+            policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, topicName.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()
+                && policies.get().transaction_enable) {
             this.transactionBuffer = brokerService.getPulsar()

Review comment:
       In my humble opinion, transactionBuffer should look like topic level.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #11333: Add TransactionEnable in Namespace

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333#discussion_r671023851



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -394,8 +416,16 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
                 }
             }
         }
-
+        TopicName topicName1 = TopicName.get(getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, topicName1.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }

Review comment:
       There are one or two places used in a class, and there is no meeting point in itself.Is it necessary to abstract it into a method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #11333: Add TransactionEnable in Namespace

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333#issuecomment-882318047


   Because it will affect cross-namespace transactions, namespace-level transactions will not be added temporarily


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #11333: Add TransactionEnable in Namespace

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #11333:
URL: https://github.com/apache/pulsar/pull/11333#discussion_r670967460



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -140,11 +143,21 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
         this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
+        TopicName topicName1 = TopicName.get(getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                     .get(AdminResource.path(POLICIES, topicName1.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       Same as above

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -343,7 +351,16 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
         this.transactionCompletableFuture = new CompletableFuture<>();
-        if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
+        TopicName topicName = TopicName.get(topic);
+        Optional<Policies> policies = null;
+        try {
+            policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, topicName.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()
+                && policies.get().transaction_enable) {
             this.transactionBuffer = brokerService.getPulsar()

Review comment:
       If we do not create a transactionBuffer, will the client still send data to the TC?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -394,8 +416,16 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
                 }
             }
         }
-
+        TopicName topicName1 = TopicName.get(getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, topicName1.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }

Review comment:
       Should we abstract it as a method?

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
##########
@@ -3616,6 +3616,66 @@ public void clearProperties(String namespace) throws PulsarAdminException {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void setTransactionEnable(String namespace, boolean transactionEnable) throws PulsarAdminException {
+        try {
+            setTransactionEnableAsync(namespace, transactionEnable)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setTransactionEnableAsync(String namespace, boolean transactionEnable) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "transactionEnabled");
+        return asyncPostRequest(path, Entity.entity(transactionEnable, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public boolean getTransactionEnabled(String namespace) throws PulsarAdminException {
+        try {
+            return getTransactionEnabledAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> getTransactionEnabledAsync(String namespace) {

Review comment:
       We also need to add CMD

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -482,9 +486,18 @@ private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageId
     }
 
     private boolean isTransactionEnabled() {
+        TopicName topicName = TopicName.get(subscription.getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = cnx.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, topicName.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       Please log error

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -482,9 +486,18 @@ private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageId
     }
 
     private boolean isTransactionEnabled() {
+        TopicName topicName = TopicName.get(subscription.getTopicName());
+        Optional<Policies> policies = null;
+        try {
+            policies = cnx.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, topicName.getNamespaceObject().toString()));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
         return subscription instanceof PersistentSubscription
                 && ((PersistentTopic) subscription.getTopic())
-                .getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled();
+                .getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
+                && policies.get().transaction_enable;

Review comment:
       There may be NPE, because policies can be null




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org