You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/12 01:41:25 UTC

[pulsar] branch master updated: [pulsar-client] Fix broken replication msg to specific cluster (#4930)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 02f9fd3  [pulsar-client] Fix broken replication msg to specific cluster (#4930)
02f9fd3 is described below

commit 02f9fd3e055f404501e41413ec1081331ff945c3
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Aug 11 18:41:19 2019 -0700

    [pulsar-client] Fix broken replication msg to specific cluster (#4930)
---
 .../pulsar/broker/service/ReplicatorTest.java      | 39 ++++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |  3 ++
 2 files changed, 42 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index eddb7cd..f68214a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -54,12 +54,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -812,6 +814,43 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
     }
 
+    @Test
+    public void testReplicatedCluster() throws Exception {
+
+        log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
+
+        final String namespace = "pulsar/global/repl";
+        final String topicName = String.format("persistent://%s/topic1", namespace);
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+        admin1.topics().createPartitionedTopic(topicName, 4);
+
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
+        byte[] value = "test".getBytes();
+
+        // publish message local only
+        TypedMessageBuilder<byte[]> msg = producer1.newMessage().replicationClusters(Lists.newArrayList("r1")).value(value);
+        msg.send();
+        assertEquals(consumer1.receive().getValue(), value);
+
+        Message<byte[]> msg2 = consumer2.receive(1, TimeUnit.SECONDS);
+        if (msg2 != null) {
+            fail("msg should have not been replicated to remote cluster");
+        }
+        
+        consumer1.close();
+        consumer2.close();
+        producer1.close();
+
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7d9de6f..70ac8e4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1420,6 +1420,9 @@ public class Commands {
         if (builder.hasReplicatedFrom()) {
             messageMetadata.setReplicatedFrom(builder.getReplicatedFrom());
         }
+        if (builder.getReplicateToCount() > 0) {
+            messageMetadata.addAllReplicateTo(builder.getReplicateToList());
+        }
         if (builder.hasSchemaVersion()) {
             messageMetadata.setSchemaVersion(builder.getSchemaVersion());
         }