You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/29 05:47:12 UTC

[GitHub] [pulsar] rdhabalia opened a new pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

rdhabalia opened a new pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235


   ### Motivation
   Users select the topic type based on the application use case and requirement of topic behavior. However, in some circumstances, users need some additional behavior on top of the existing implementation and even would like to inject custom workflow in existing topic behavior. Such special circumstances are mostly needed when users would like to do smooth migrations of topics or pulsar clusters without impacting producer and consumer applications. In such scenarios, users can override publish or dispatch behavior of the topic and plug in the additional workflow. For example: perform dual write on multiple topics while migration or, skip messages published from the specific source without explicit publish failures, ignore specific subscription source without generating a client-side error, or without impacting client applications. This feature will be useful for any kind of migration where the pulsar cluster is serving live topics and require custom topic level treatment for flawles
 s server-side migration and without impacting client application especially legacy applications which are hard to change.
   
   ### Modification
   - Add broker side config `topicFactoryClassName` and `TopicFactory` interface.
   - Mark feature is an experimental feature to accommodate future API changes.
   
   ### Result
   - Users can create topic with custom workflow for their server-side migration usecases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#issuecomment-1061207164


   ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#discussion_r753671325



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.ManagedLedger;
+
+/**
+ * Pluggable TopicFactory to create topic with specific behavior in broker.
+ * Note: This API and feature is in experimental phase.
+ */
+public interface TopicFactory {
+
+    <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz);

Review comment:
       should we add a `close` method ?
   this will allow cleanup operations




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#issuecomment-1007859541


   ping. @eolivelli can you please unblock the PR if your concern is addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#issuecomment-974599118


   ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#issuecomment-930938378


   Thanks for your contribution. For this PR, do we need to update docs?
   
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#issuecomment-1007859541


   ping. @eolivelli can you please unblock the PR if your concern is addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#discussion_r803042222



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -2809,6 +2817,35 @@ public long getPausedConnections() {
         return pausedConnections.longValue();
     }
 
+    @SuppressWarnings("unchecked")
+    private <T extends Topic> T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService,
+            Class<T> topicClazz) {
+        if (topicFactory != null) {
+            try {
+                Topic newTopic = topicFactory.create(topic, ledger, brokerService, topicClazz);
+                if (newTopic != null) {
+                    return (T) newTopic;
+                }
+            } catch (Throwable e) {
+                log.warn("Failed to create persistent topic using factory {}, {}", topic, e.getMessage());

Review comment:
       actually handling the exception and falling back to default implementation strategy was intentional to avoid unavailability of topic. however, I have updated it to throw exception for now so, we can merge this PR as it's open for almost 6 months now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#discussion_r753671325



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.ManagedLedger;
+
+/**
+ * Pluggable TopicFactory to create topic with specific behavior in broker.
+ * Note: This API and feature is in experimental phase.
+ */
+public interface TopicFactory {
+
+    <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz);

Review comment:
       should we add a `close` method ?
   this will allow cleanup operations




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#discussion_r780824370



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
##########
@@ -1859,4 +1870,47 @@ public void testProducerBusy() throws Exception {
 
         assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
     }
+
+    @Test(dataProvider = "topic")
+    public void testPersistentTopicFactory(boolean isPersistent) throws Exception {
+        conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
+        restartBroker();
+
+        final String topicName = (isPersistent ? "persistent" : "non-persistent") + "://prop/ns-abc/factoryTopic"
+                + isPersistent;
+        MyTopicFactory.count.set(0);
+
+        // 1. producer connect
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
+
+        assertTrue(MyTopicFactory.count.get() > 0);
+        producer.close();
+        consumer.close();
+    }
+
+    public static class MyTopicFactory implements TopicFactory {
+        private static AtomicInteger count = new AtomicInteger(0);
+
+        @Override
+        public <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService,
+                Class<T> topicClazz) {
+            try {
+                count.incrementAndGet();
+                if(topicClazz == NonPersistentTopic.class) {
+                    return (T) new NonPersistentTopic(topic, brokerService);
+                }else {
+                    return (T) new PersistentTopic(topic, ledger, brokerService); 
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            // No-op

Review comment:
       Please verify the number of calls to this method

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -2809,6 +2817,35 @@ public long getPausedConnections() {
         return pausedConnections.longValue();
     }
 
+    @SuppressWarnings("unchecked")
+    private <T extends Topic> T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService,
+            Class<T> topicClazz) {
+        if (topicFactory != null) {
+            try {
+                Topic newTopic = topicFactory.create(topic, ledger, brokerService, topicClazz);
+                if (newTopic != null) {
+                    return (T) newTopic;
+                }
+            } catch (Throwable e) {
+                log.warn("Failed to create persistent topic using factory {}, {}", topic, e.getMessage());

Review comment:
       Here we should fail otherwise the system will be using the default implementation, leading to unpredictable behaviour 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -327,6 +328,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
         this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
         this.blockedDispatchers = new ConcurrentOpenHashSet<>();
+        this.topicFactory = createPersistentTopicFactory();

Review comment:
       We are not shutting down (calling 'close')  the new instance 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -2809,6 +2817,35 @@ public long getPausedConnections() {
         return pausedConnections.longValue();
     }
 
+    @SuppressWarnings("unchecked")
+    private <T extends Topic> T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService,
+            Class<T> topicClazz) {
+        if (topicFactory != null) {
+            try {
+                Topic newTopic = topicFactory.create(topic, ledger, brokerService, topicClazz);
+                if (newTopic != null) {
+                    return (T) newTopic;
+                }
+            } catch (Throwable e) {
+                log.warn("Failed to create persistent topic using factory {}, {}", topic, e.getMessage());
+            }
+        }
+        return topicClazz == NonPersistentTopic.class ? (T) new NonPersistentTopic(topic, BrokerService.this)
+                : (T) new PersistentTopic(topic, ledger, brokerService);
+    }
+
+    private TopicFactory createPersistentTopicFactory() {
+        String topicFactoryClassName = pulsar.getConfig().getTopicFactoryClassName();
+        if (StringUtils.isNotBlank(topicFactoryClassName)) {
+            try {
+                return (TopicFactory) Class.forName(topicFactoryClassName).newInstance();
+            } catch (Exception e) {
+                log.warn("Failed to initialize topic factory class {}", topicFactoryClassName, e);

Review comment:
       We should fail, otherwise the system will use the default configuration, leading to unexpected behaviour 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#issuecomment-974599118


   ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#issuecomment-1036483631


   @eolivelli ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#issuecomment-1034144092


   @eolivelli addressed the comments. please unblock when get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#discussion_r772770710



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.ManagedLedger;
+
+/**
+ * Pluggable TopicFactory to create topic with specific behavior in broker.
+ * Note: This API and feature is in experimental phase.
+ */
+public interface TopicFactory {
+
+    <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz);

Review comment:
       extended `Closeable` interface




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#issuecomment-999140570


   @eolivelli addressed the comment. can you PTAL and unblock the PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #12235: [pulsar-broker] PIP-100 Support pluggable topic factory

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#discussion_r772619465



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.ManagedLedger;
+
+/**
+ * Pluggable TopicFactory to create topic with specific behavior in broker.
+ * Note: This API and feature is in experimental phase.
+ */
+public interface TopicFactory {
+
+    <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz);

Review comment:
       @eolivelli addressed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org