You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/04/09 19:57:57 UTC

[pulsar] branch master updated: Fix IllegalStateException in PersistentReplicator (#10098)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 05f1f7e  Fix IllegalStateException in PersistentReplicator (#10098)
05f1f7e is described below

commit 05f1f7e58fc54648e7843335dfb64d413d9b6a1e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 9 22:57:16 2021 +0300

    Fix IllegalStateException in PersistentReplicator (#10098)
    
    Fixes: #10097
    
    ### Motivation
    
    See #10097 for the issue. It seems that the code broke when the switch was made to LightProto in #9046.
    
    ### Modifications
    
    It is necessary to use `msg.getMessageBuilder().hasReplicatedFrom()` and use logic that only calls `msg.getMessageBuilder().getReplicatedFrom()` if `hasReplicatedFrom()` returns true.
---
 .../service/persistent/PersistentReplicator.java   |   3 +-
 .../broker/service/ReplicatorSubscriptionTest.java | 170 +++++++++++++++++++++
 .../pulsar/broker/service/ReplicatorTestBase.java  |  76 ++++-----
 3 files changed, 198 insertions(+), 51 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index e1f6949..282dec5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -760,7 +760,8 @@ public class PersistentReplicator extends AbstractReplicator
 
         int markerType = msg.getMessageBuilder().getMarkerType();
 
-        if (!remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom())) {
+        if (!(msg.getMessageBuilder().hasReplicatedFrom()
+                && remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom()))) {
             // Only consider markers that are coming from the same cluster that this
             // replicator instance is assigned to.
             // All the replicators will see all the markers, but we need to only process
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
new file mode 100644
index 0000000..2dfa0fe
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests replicated subscriptions (PIP-33)
+ */
+@Test(groups = "broker")
+public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
+    private static final Logger log = LoggerFactory.getLogger(ReplicatorSubscriptionTest.class);
+
+    @Override
+    @BeforeClass(timeOut = 300000)
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    /**
+     * Tests replicated subscriptions across two regions
+     */
+    @Test
+    public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
+        String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+        String topicName = "persistent://" + namespace + "/mytopic";
+        String subscriptionName = "cluster-subscription";
+        // Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054
+        // TODO: duplications shouldn't be allowed, change to "false" when fixing the issue
+        boolean allowDuplicates = true;
+        // this setting can be used to manually run the test with subscription replication disabled
+        // it shows that subscription replication has no impact in behavior for this test case
+        boolean replicateSubscriptionState = true;
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in r1
+        createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);
+
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in r2
+        createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);
+
+        Set<String> sentMessages = new LinkedHashSet<>();
+
+        // send messages in r1
+        {
+            @Cleanup
+            Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                    .enableBatching(false)
+                    .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                    .create();
+            int numMessages = 6;
+            for (int i = 0; i < numMessages; i++) {
+                String body = "message" + i;
+                producer.send(body.getBytes(StandardCharsets.UTF_8));
+                sentMessages.add(body);
+            }
+            producer.close();
+        }
+
+        Set<String> receivedMessages = new LinkedHashSet<>();
+
+        // consume 3 messages in r1
+        try (Consumer<byte[]> consumer1 = client1.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(replicateSubscriptionState)
+                .subscribe()) {
+            readMessages(consumer1, receivedMessages, 3, allowDuplicates);
+        }
+
+        // wait for subscription to be replicated
+        Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+
+        // consume remaining messages in r2
+        try (Consumer<byte[]> consumer2 = client2.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(replicateSubscriptionState)
+                .subscribe()) {
+            readMessages(consumer2, receivedMessages, -1, allowDuplicates);
+        }
+
+        // assert that all messages have been received
+        assertEquals(new ArrayList<>(sentMessages), new ArrayList<>(receivedMessages), "Sent and received " +
+                "messages don't match.");
+    }
+
+    void readMessages(Consumer<byte[]> consumer, Set<String> messages, int maxMessages, boolean allowDuplicates)
+            throws PulsarClientException {
+        int count = 0;
+        while (count < maxMessages || maxMessages == -1) {
+            Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+            if (message != null) {
+                count++;
+                String body = new String(message.getValue(), StandardCharsets.UTF_8);
+                if (!allowDuplicates) {
+                    assertFalse(messages.contains(body), "Duplicate message '" + body + "' detected.");
+                }
+                messages.add(body);
+            } else {
+                break;
+            }
+        }
+    }
+
+    void createReplicatedSubscription(PulsarClient pulsarClient, String topicName, String subscriptionName,
+                                      boolean replicateSubscriptionState)
+            throws PulsarClientException {
+        pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(replicateSubscriptionState)
+                .subscribe()
+                .close();
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 08b3627..ea1fb42 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -185,64 +185,40 @@ public class ReplicatorTestBase extends TestRetrySupport {
 
     }
 
-    private void setConfig3DefaultValue() {
-        config3.setClusterName("r3");
-        config3.setAdvertisedAddress("localhost");
-        config3.setWebServicePort(Optional.of(0));
-        config3.setWebServicePortTls(Optional.of(0));
-        config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
-        config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
-        config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config3.setBrokerServicePort(Optional.of(0));
-        config3.setBrokerServicePortTls(Optional.of(0));
+    public void setConfig3DefaultValue() {
+        setConfigDefaults(config3, "r3", bkEnsemble3);
         config3.setTlsEnabled(true);
-        config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config3.setDefaultNumberOfNamespaceBundles(1);
-        config3.setAllowAutoTopicCreationType("non-partitioned");
     }
 
     public void setConfig1DefaultValue(){
-        config1.setClusterName("r1");
-        config1.setAdvertisedAddress("localhost");
-        config1.setWebServicePort(Optional.of(0));
-        config1.setWebServicePortTls(Optional.of(0));
-        config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
-        config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
-        config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config1.setBrokerServicePort(Optional.of(0));
-        config1.setBrokerServicePortTls(Optional.of(0));
-        config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
-        config1.setDefaultNumberOfNamespaceBundles(1);
-        config1.setAllowAutoTopicCreationType("non-partitioned");
+        setConfigDefaults(config1, "r1", bkEnsemble1);
     }
 
     public void setConfig2DefaultValue() {
-        config2.setClusterName("r2");
-        config2.setAdvertisedAddress("localhost");
-        config2.setWebServicePort(Optional.of(0));
-        config2.setWebServicePortTls(Optional.of(0));
-        config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
-        config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
-        config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
+        setConfigDefaults(config2, "r2", bkEnsemble2);
+    }
+
+    private void setConfigDefaults(ServiceConfiguration config, String clusterName,
+                                   LocalBookkeeperEnsemble bookkeeperEnsemble) {
+        config.setClusterName(clusterName);
+        config.setAdvertisedAddress("localhost");
+        config.setWebServicePort(Optional.of(0));
+        config.setWebServicePortTls(Optional.of(0));
+        config.setZookeeperServers("127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort());
+        config.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
+        config.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
+        config.setBrokerDeleteInactiveTopicsFrequencySeconds(
                 inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config2.setBrokerServicePort(Optional.of(0));
-        config2.setBrokerServicePortTls(Optional.of(0));
-        config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
-        config2.setDefaultNumberOfNamespaceBundles(1);
-        config2.setAllowAutoTopicCreationType("non-partitioned");
+        config.setBrokerServicePort(Optional.of(0));
+        config.setBrokerServicePortTls(Optional.of(0));
+        config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config.setDefaultNumberOfNamespaceBundles(1);
+        config.setAllowAutoTopicCreationType("non-partitioned");
+        config.setEnableReplicatedSubscriptions(true);
+        config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
     }
 
     public void resetConfig1() {