You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/04/09 19:57:57 UTC
[pulsar] branch master updated: Fix IllegalStateException in
PersistentReplicator (#10098)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 05f1f7e Fix IllegalStateException in PersistentReplicator (#10098)
05f1f7e is described below
commit 05f1f7e58fc54648e7843335dfb64d413d9b6a1e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 9 22:57:16 2021 +0300
Fix IllegalStateException in PersistentReplicator (#10098)
Fixes: #10097
### Motivation
See #10097 for the issue. It seems that the code broke when the switch was made to LightProto in #9046.
### Modifications
It is necessary to use `msg.getMessageBuilder().hasReplicatedFrom()` and use logic that only calls `msg.getMessageBuilder().getReplicatedFrom()` if `hasReplicatedFrom()` returns true.
---
.../service/persistent/PersistentReplicator.java | 3 +-
.../broker/service/ReplicatorSubscriptionTest.java | 170 +++++++++++++++++++++
.../pulsar/broker/service/ReplicatorTestBase.java | 76 ++++-----
3 files changed, 198 insertions(+), 51 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index e1f6949..282dec5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -760,7 +760,8 @@ public class PersistentReplicator extends AbstractReplicator
int markerType = msg.getMessageBuilder().getMarkerType();
- if (!remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom())) {
+ if (!(msg.getMessageBuilder().hasReplicatedFrom()
+ && remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom()))) {
// Only consider markers that are coming from the same cluster that this
// replicator instance is assigned to.
// All the replicators will see all the markers, but we need to only process
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
new file mode 100644
index 0000000..2dfa0fe
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests replicated subscriptions (PIP-33)
+ */
+@Test(groups = "broker")
+public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
+ private static final Logger log = LoggerFactory.getLogger(ReplicatorSubscriptionTest.class);
+
+ @Override
+ @BeforeClass(timeOut = 300000)
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ /**
+ * Tests replicated subscriptions across two regions
+ */
+ @Test
+ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
+ String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+ String topicName = "persistent://" + namespace + "/mytopic";
+ String subscriptionName = "cluster-subscription";
+ // Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054
+ // TODO: duplications shouldn't be allowed, change to "false" when fixing the issue
+ boolean allowDuplicates = true;
+ // this setting can be used to manually run the test with subscription replication disabled
+ // it shows that subscription replication has no impact in behavior for this test case
+ boolean replicateSubscriptionState = true;
+
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ // create subscription in r1
+ createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);
+
+ @Cleanup
+ PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ // create subscription in r2
+ createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);
+
+ Set<String> sentMessages = new LinkedHashSet<>();
+
+ // send messages in r1
+ {
+ @Cleanup
+ Producer<byte[]> producer = client1.newProducer().topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ int numMessages = 6;
+ for (int i = 0; i < numMessages; i++) {
+ String body = "message" + i;
+ producer.send(body.getBytes(StandardCharsets.UTF_8));
+ sentMessages.add(body);
+ }
+ producer.close();
+ }
+
+ Set<String> receivedMessages = new LinkedHashSet<>();
+
+ // consume 3 messages in r1
+ try (Consumer<byte[]> consumer1 = client1.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .replicateSubscriptionState(replicateSubscriptionState)
+ .subscribe()) {
+ readMessages(consumer1, receivedMessages, 3, allowDuplicates);
+ }
+
+ // wait for subscription to be replicated
+ Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+
+ // consume remaining messages in r2
+ try (Consumer<byte[]> consumer2 = client2.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .replicateSubscriptionState(replicateSubscriptionState)
+ .subscribe()) {
+ readMessages(consumer2, receivedMessages, -1, allowDuplicates);
+ }
+
+ // assert that all messages have been received
+ assertEquals(new ArrayList<>(sentMessages), new ArrayList<>(receivedMessages), "Sent and received " +
+ "messages don't match.");
+ }
+
+ void readMessages(Consumer<byte[]> consumer, Set<String> messages, int maxMessages, boolean allowDuplicates)
+ throws PulsarClientException {
+ int count = 0;
+ while (count < maxMessages || maxMessages == -1) {
+ Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+ if (message != null) {
+ count++;
+ String body = new String(message.getValue(), StandardCharsets.UTF_8);
+ if (!allowDuplicates) {
+ assertFalse(messages.contains(body), "Duplicate message '" + body + "' detected.");
+ }
+ messages.add(body);
+ } else {
+ break;
+ }
+ }
+ }
+
+ void createReplicatedSubscription(PulsarClient pulsarClient, String topicName, String subscriptionName,
+ boolean replicateSubscriptionState)
+ throws PulsarClientException {
+ pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName(subscriptionName)
+ .replicateSubscriptionState(replicateSubscriptionState)
+ .subscribe()
+ .close();
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 08b3627..ea1fb42 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -185,64 +185,40 @@ public class ReplicatorTestBase extends TestRetrySupport {
}
- private void setConfig3DefaultValue() {
- config3.setClusterName("r3");
- config3.setAdvertisedAddress("localhost");
- config3.setWebServicePort(Optional.of(0));
- config3.setWebServicePortTls(Optional.of(0));
- config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
- config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
- config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
- config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
- inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
- config3.setBrokerServicePort(Optional.of(0));
- config3.setBrokerServicePortTls(Optional.of(0));
+ public void setConfig3DefaultValue() {
+ setConfigDefaults(config3, "r3", bkEnsemble3);
config3.setTlsEnabled(true);
- config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
- config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
- config3.setDefaultNumberOfNamespaceBundles(1);
- config3.setAllowAutoTopicCreationType("non-partitioned");
}
public void setConfig1DefaultValue(){
- config1.setClusterName("r1");
- config1.setAdvertisedAddress("localhost");
- config1.setWebServicePort(Optional.of(0));
- config1.setWebServicePortTls(Optional.of(0));
- config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
- config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
- config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
- config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
- inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
- config1.setBrokerServicePort(Optional.of(0));
- config1.setBrokerServicePortTls(Optional.of(0));
- config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
- config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
- config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
- config1.setDefaultNumberOfNamespaceBundles(1);
- config1.setAllowAutoTopicCreationType("non-partitioned");
+ setConfigDefaults(config1, "r1", bkEnsemble1);
}
public void setConfig2DefaultValue() {
- config2.setClusterName("r2");
- config2.setAdvertisedAddress("localhost");
- config2.setWebServicePort(Optional.of(0));
- config2.setWebServicePortTls(Optional.of(0));
- config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
- config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
- config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
- config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
+ setConfigDefaults(config2, "r2", bkEnsemble2);
+ }
+
+ private void setConfigDefaults(ServiceConfiguration config, String clusterName,
+ LocalBookkeeperEnsemble bookkeeperEnsemble) {
+ config.setClusterName(clusterName);
+ config.setAdvertisedAddress("localhost");
+ config.setWebServicePort(Optional.of(0));
+ config.setWebServicePortTls(Optional.of(0));
+ config.setZookeeperServers("127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort());
+ config.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
+ config.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
+ config.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
- config2.setBrokerServicePort(Optional.of(0));
- config2.setBrokerServicePortTls(Optional.of(0));
- config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
- config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
- config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
- config2.setDefaultNumberOfNamespaceBundles(1);
- config2.setAllowAutoTopicCreationType("non-partitioned");
+ config.setBrokerServicePort(Optional.of(0));
+ config.setBrokerServicePortTls(Optional.of(0));
+ config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+ config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+ config.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+ config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+ config.setDefaultNumberOfNamespaceBundles(1);
+ config.setAllowAutoTopicCreationType("non-partitioned");
+ config.setEnableReplicatedSubscriptions(true);
+ config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
}
public void resetConfig1() {