You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/21 08:56:24 UTC

[GitHub] jai1 commented on a change in pull request #1262: Broker should not start replicator for root partitioned-topic

jai1 commented on a change in pull request #1262: Broker should not start replicator for root partitioned-topic
URL: https://github.com/apache/incubator-pulsar/pull/1262#discussion_r169568719
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 ##########
 @@ -211,5 +215,42 @@ public static String getReplicatorName(String replicatorPrefix, String cluster)
         return (replicatorPrefix + "." + cluster).intern();
     }
 
+    /**
+     * Replication can't be started on root-partitioned-topic to avoid producer startup conflict.
+     * 
+     * <pre>
+     * eg:
+     * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then
+     * broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2".
+     * 
+     * However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual 
+     * producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing 
+     * replicator producers.
+     * </pre>
+     * 
+     * Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned
+     * producers.
+     * 
+     * @param topicName
+     * @param brokerService
+     */
+    private void validatePartitionedTopic(String topicName, BrokerService brokerService) throws NamingException {
+        DestinationName destination = DestinationName.get(topicName);
+        String partitionedTopicPath = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
+                destination.getNamespace().toString(), destination.getDomain().toString(),
+                destination.getEncodedLocalName());
+        boolean isPartitionedTopic = false;
+        try {
+            isPartitionedTopic = brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(partitionedTopicPath).isPresent();
+        } catch (Exception e) {
+            log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage());
 
 Review comment:
   Throw exception here?
   
   "Failed to retrieve znode {}", partitionedTopicPath

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services