You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 14:12:21 UTC

[pulsar] branch branch-2.9 updated (9606501 -> de2ad45)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 9606501  Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565)
     new 704277f  Add remote cluster info to replicator producer name. (#12734)
     new de2ad45  [Issue 12796][broker] Fix replicator error with user defined ReplicatorPrefix (#12797)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/broker/service/AbstractReplicator.java  |  3 +-
 .../org/apache/pulsar/broker/service/Producer.java | 21 ++++++++--
 .../pulsar/broker/service/ReplicatorTest.java      | 47 ++++++++++++++++++++++
 3 files changed, 67 insertions(+), 4 deletions(-)

[pulsar] 01/02: Add remote cluster info to replicator producer name. (#12734)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 704277f96c33429d9778e7d5cd63156b282a52f0
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Nov 15 16:17:03 2021 +0800

    Add remote cluster info to replicator producer name. (#12734)
    
    If the producer is the replicator client, the `remoteCluster` is not right :
    
    ```
    Producer{topic=PersistentTopic{topic=persistent://public/default/tp1-partition-0}, client=/127.0.0.1:54628, producerName=pulsar.repl.cluster3, producerId=0, remoteCluster=cluster3, isRemote=true}
    ```
    
    - Add remote cluster-info to replicator producer builder.
    
    (cherry picked from commit 10818e8e39b9f039793b8534da0252f2f4c91a61)
---
 .../pulsar/broker/service/AbstractReplicator.java   |  3 ++-
 .../org/apache/pulsar/broker/service/Producer.java  | 11 ++++++++++-
 .../pulsar/broker/service/ReplicatorTest.java       | 21 +++++++++++++++++++++
 3 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 59ec74f..a404f10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -83,7 +83,8 @@ public abstract class AbstractReplicator {
                 .enableBatching(false)
                 .sendTimeout(0, TimeUnit.SECONDS) //
                 .maxPendingMessages(producerQueueSize) //
-                .producerName(getReplicatorName(replicatorPrefix, localCluster));
+                .producerName(String.format("%s%s%s", getReplicatorName(replicatorPrefix, localCluster),
+                        REPL_PRODUCER_NAME_DELIMITER, remoteCluster));
         STATE_UPDATER.set(this, State.Stopped);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 64e0fd6..bef4c54 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -128,7 +128,7 @@ public class Producer {
 
         this.isRemote = producerName
                 .startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix());
-        this.remoteCluster = isRemote ? producerName.split("\\.")[2].split(REPL_PRODUCER_NAME_DELIMITER)[0] : null;
+        this.remoteCluster = parseRemoteClusterName(producerName, isRemote);
 
         this.isEncrypted = isEncrypted;
         this.schemaVersion = schemaVersion;
@@ -138,6 +138,15 @@ public class Producer {
         this.clientAddress = cnx.clientSourceAddress();
     }
 
+    private String parseRemoteClusterName(String producerName, boolean isRemote) {
+        if (isRemote) {
+            String clusterName = producerName.split("\\.")[2];
+            return clusterName.contains(REPL_PRODUCER_NAME_DELIMITER)
+                    ? clusterName.split(REPL_PRODUCER_NAME_DELIMITER)[0] : clusterName;
+        }
+        return null;
+    }
+
     /**
      * Method to determine if this producer can replace another producer.
      * @param other - producer to compare to this one
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 11c8e19..ac8f233 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -39,6 +39,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -715,6 +716,26 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertNull(producer);
     }
 
+    @Test(priority = 5, timeOut = 30000)
+    public void testReplicatorProducerName() throws Exception {
+        log.info("--- Starting ReplicatorTest::testReplicatorProducerName ---");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicatorProducerName");
+        final TopicName dest = TopicName.get(topicName);
+
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(pulsar2.getBrokerService().getTopicReference(topicName).isPresent());
+        });
+        Optional<Topic> topic = pulsar2.getBrokerService().getTopicReference(topicName);
+        assertTrue(topic.isPresent());
+        Set<String> remoteClusters = topic.get().getProducers().values().stream()
+                .map(org.apache.pulsar.broker.service.Producer::getRemoteCluster)
+                .collect(Collectors.toSet());
+        assertTrue(remoteClusters.contains("r1"));
+    }
+
     /**
      * Issue #199
      *

[pulsar] 02/02: [Issue 12796][broker] Fix replicator error with user defined ReplicatorPrefix (#12797)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit de2ad45df1ab7734ec4f20bc9f3290c0aee6429a
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Nov 17 16:45:31 2021 +0800

    [Issue 12796][broker] Fix replicator error with user defined ReplicatorPrefix (#12797)
    
    (cherry picked from commit bab2a81c17bf2205c0d9498c0d7a73c978777e4b)
---
 .../org/apache/pulsar/broker/service/Producer.java | 16 ++++++++-----
 .../pulsar/broker/service/ReplicatorTest.java      | 26 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index bef4c54..d72f904 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -126,9 +126,10 @@ public class Producer {
         stats.metadata = this.metadata;
         stats.accessMode = Commands.convertProducerAccessMode(accessMode);
 
-        this.isRemote = producerName
-                .startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix());
-        this.remoteCluster = parseRemoteClusterName(producerName, isRemote);
+
+        String replicatorPrefix = cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix() + ".";
+        this.isRemote = producerName.startsWith(replicatorPrefix);
+        this.remoteCluster = parseRemoteClusterName(producerName, isRemote, replicatorPrefix);
 
         this.isEncrypted = isEncrypted;
         this.schemaVersion = schemaVersion;
@@ -138,9 +139,14 @@ public class Producer {
         this.clientAddress = cnx.clientSourceAddress();
     }
 
-    private String parseRemoteClusterName(String producerName, boolean isRemote) {
+    /**
+     * Producer name for replicator is in format.
+     * "replicatorPrefix.localCluster" (old)
+     * "replicatorPrefix.localCluster-->remoteCluster" (new)
+     */
+    private String parseRemoteClusterName(String producerName, boolean isRemote, String replicatorPrefix) {
         if (isRemote) {
-            String clusterName = producerName.split("\\.")[2];
+            String clusterName = producerName.substring(replicatorPrefix.length());
             return clusterName.contains(REPL_PRODUCER_NAME_DELIMITER)
                     ? clusterName.split(REPL_PRODUCER_NAME_DELIMITER)[0] : clusterName;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index ac8f233..04f96ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -736,6 +736,32 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertTrue(remoteClusters.contains("r1"));
     }
 
+    @Test(priority = 5, timeOut = 30000)
+    public void testReplicatorProducerNameWithUserDefinedReplicatorPrefix() throws Exception {
+        log.info("--- Starting ReplicatorTest::testReplicatorProducerNameWithUserDefinedReplicatorPrefix ---");
+        final String topicName = BrokerTestUtil.newUniqueName(
+                "persistent://pulsar/ns/testReplicatorProducerNameWithUserDefinedReplicatorPrefix");
+        final TopicName dest = TopicName.get(topicName);
+
+        pulsar1.getConfiguration().setReplicatorPrefix("user-defined-prefix");
+        pulsar2.getConfiguration().setReplicatorPrefix("user-defined-prefix");
+        pulsar3.getConfiguration().setReplicatorPrefix("user-defined-prefix");
+
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest);
+
+        Awaitility.await().untilAsserted(()->{
+            assertTrue(pulsar2.getBrokerService().getTopicReference(topicName).isPresent());
+        });
+        Optional<Topic> topic = pulsar2.getBrokerService().getTopicReference(topicName);
+        assertTrue(topic.isPresent());
+        Set<String> remoteClusters = topic.get().getProducers().values().stream()
+                .map(org.apache.pulsar.broker.service.Producer::getRemoteCluster)
+                .collect(Collectors.toSet());
+        assertTrue(remoteClusters.contains("r1"));
+    }
+
+
     /**
      * Issue #199
      *