You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/12 01:41:25 UTC
[pulsar] branch master updated: [pulsar-client] Fix broken
replication msg to specific cluster (#4930)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 02f9fd3 [pulsar-client] Fix broken replication msg to specific cluster (#4930)
02f9fd3 is described below
commit 02f9fd3e055f404501e41413ec1081331ff945c3
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Aug 11 18:41:19 2019 -0700
[pulsar-client] Fix broken replication msg to specific cluster (#4930)
---
.../pulsar/broker/service/ReplicatorTest.java | 39 ++++++++++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 3 ++
2 files changed, 42 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index eddb7cd..f68214a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -54,12 +54,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -812,6 +814,43 @@ public class ReplicatorTest extends ReplicatorTestBase {
}
+ @Test
+ public void testReplicatedCluster() throws Exception {
+
+ log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
+
+ final String namespace = "pulsar/global/repl";
+ final String topicName = String.format("persistent://%s/topic1", namespace);
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+ admin1.topics().createPartitionedTopic(topicName, 4);
+
+ PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
+ org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
+ org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
+ byte[] value = "test".getBytes();
+
+ // publish message local only
+ TypedMessageBuilder<byte[]> msg = producer1.newMessage().replicationClusters(Lists.newArrayList("r1")).value(value);
+ msg.send();
+ assertEquals(consumer1.receive().getValue(), value);
+
+ Message<byte[]> msg2 = consumer2.receive(1, TimeUnit.SECONDS);
+ if (msg2 != null) {
+ fail("msg should have not been replicated to remote cluster");
+ }
+
+ consumer1.close();
+ consumer2.close();
+ producer1.close();
+
+ }
+
private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7d9de6f..70ac8e4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1420,6 +1420,9 @@ public class Commands {
if (builder.hasReplicatedFrom()) {
messageMetadata.setReplicatedFrom(builder.getReplicatedFrom());
}
+ if (builder.getReplicateToCount() > 0) {
+ messageMetadata.addAllReplicateTo(builder.getReplicateToList());
+ }
if (builder.hasSchemaVersion()) {
messageMetadata.setSchemaVersion(builder.getSchemaVersion());
}