You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/03 11:09:12 UTC

[GitHub] [pulsar] liangyepianzhou opened a new pull request #11543: Donot create transaction components for function work topic

liangyepianzhou opened a new pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543


   
   Fixes https://github.com/apache/pulsar/issues/11481 
   ### Motivation
   - Assist https://github.com/apache/pulsar/pull/11494 to fix the problem that transaction cannot be started together with function
   - The three topics of the function were created twice at startup, and the second time the topic’s createIfMissing was set to false, causing the transaction’s PendingAck to fail to be created.
   - The three topics of the function do not require transaction components
   ### Modifications
   - In PersistentTopic and PersistentSubscription, filter the three topics when creating PendingAckHandler and TransactionBuffer
   ### Verifying this change
   Add ```TransactionTest::testFilterFunctionTopicForTransactionComponent```to verify
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   For this PR,  we don't  need to update docs.
   
     - This is just an optimization of the previous test
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou removed a comment on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou removed a comment on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892452597


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou closed pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou closed pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zymap commented on a change in pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
zymap commented on a change in pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#discussion_r682248311



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -149,13 +153,30 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
         if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(TopicName.get(topicName))) {
+                && !checkTopicIsEventsNames(TopicName.get(topicName))
+                && !checkTopicIsFunctionWorkerService(topic)) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
         } else {
             this.pendingAckHandle = new PendingAckHandleDisabled();
         }
         IS_FENCED_UPDATER.set(this, FALSE);
     }
+    private boolean checkTopicIsFunctionWorkerService(PersistentTopic topic){
+        String fnWorkerConfigFile =
+                Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
+        WorkerConfig workerConfig = null;
+        try {
+            workerConfig = WorkerConfig.load(fnWorkerConfigFile);
+        } catch (IOException e) {
+            e.printStackTrace();

Review comment:
       please handle the exception not print it

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -149,13 +153,30 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
         if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(TopicName.get(topicName))) {
+                && !checkTopicIsEventsNames(TopicName.get(topicName))
+                && !checkTopicIsFunctionWorkerService(topic)) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
         } else {
             this.pendingAckHandle = new PendingAckHandleDisabled();
         }
         IS_FENCED_UPDATER.set(this, FALSE);
     }
+    private boolean checkTopicIsFunctionWorkerService(PersistentTopic topic){
+        String fnWorkerConfigFile =
+                Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
+        WorkerConfig workerConfig = null;
+        try {
+            workerConfig = WorkerConfig.load(fnWorkerConfigFile);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Assert.isNonEmpty(workerConfig);
+        Assert.isNonEmpty(topic);

Review comment:
       Don't we need to handle the returned value?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -149,13 +153,30 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
         if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(TopicName.get(topicName))) {
+                && !checkTopicIsEventsNames(TopicName.get(topicName))
+                && !checkTopicIsFunctionWorkerService(topic)) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
         } else {
             this.pendingAckHandle = new PendingAckHandleDisabled();
         }
         IS_FENCED_UPDATER.set(this, FALSE);
     }
+    private boolean checkTopicIsFunctionWorkerService(PersistentTopic topic){
+        String fnWorkerConfigFile =

Review comment:
       The function work config file may be in a different place, user can define it with environment variable

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -300,6 +305,22 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
         }
         transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
     }
+    private boolean checkTopicIsFunctionWorkerService(PersistentTopic topic){

Review comment:
       We'd better find a common place to avoid the method duplicate




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892375218


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou removed a comment on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou removed a comment on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892452597






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou removed a comment on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou removed a comment on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892323204


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-891791375


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#discussion_r682647165



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CheckTopicIsSpecial.java
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.logging.log4j.core.util.Assert;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+
+public class CheckTopicIsSpecial {
+    public static  boolean checkTopicIsFunctionWorkerService(PersistentTopic topic)  {
+        if (!Assert.isNonEmpty(topic)){
+            throw new IllegalArgumentException("topic can`t be null");
+        }
+       WorkerConfig workerConfig =  topic.getBrokerService().getPulsar().getWorkerConfig().get();
+        String topicName = topic.getName();
+        return workerConfig.getClusterCoordinationTopic().equals(topicName)
+                || workerConfig.getFunctionAssignmentTopic().equals(topicName)
+                || workerConfig.getFunctionMetadataTopic().equals(topicName);

Review comment:
       This can only work for the broker run with the function worker? If the function working running independently, here will get an incorrect worker config.
   
   I think we already have a PR to support transaction buffer and pending ack lazy creation, if new transactions happens on a topic, the transaction buffer and the pending ack will not be created?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#discussion_r682647165



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CheckTopicIsSpecial.java
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.logging.log4j.core.util.Assert;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+
+public class CheckTopicIsSpecial {
+    public static  boolean checkTopicIsFunctionWorkerService(PersistentTopic topic)  {
+        if (!Assert.isNonEmpty(topic)){
+            throw new IllegalArgumentException("topic can`t be null");
+        }
+       WorkerConfig workerConfig =  topic.getBrokerService().getPulsar().getWorkerConfig().get();
+        String topicName = topic.getName();
+        return workerConfig.getClusterCoordinationTopic().equals(topicName)
+                || workerConfig.getFunctionAssignmentTopic().equals(topicName)
+                || workerConfig.getFunctionMetadataTopic().equals(topicName);

Review comment:
       This can only work for the broker run with the function worker? If the function working running independently, here will get an incorrect worker config.
   
   I think we already have a PR to support transaction buffer and pending ack lazy creation, if new transactions happens on a topic, the transaction buffer and the pending ack will not be created?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892550492






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou removed a comment on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou removed a comment on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892258071


   /pulsarbot run-failure-checks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892452597


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892258071


   /pulsarbot run-failure-checks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou removed a comment on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou removed a comment on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-891791375


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892550492


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou closed pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou closed pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou removed a comment on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou removed a comment on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892278770


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892578384


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou removed a comment on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou removed a comment on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892550492


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou removed a comment on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou removed a comment on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892375218


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892323204


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #11543: Donot create transaction components for function work topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #11543:
URL: https://github.com/apache/pulsar/pull/11543#issuecomment-892278770


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org