You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/04/24 17:45:46 UTC

(pulsar) branch master updated: [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b774666331d [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948)
b774666331d is described below

commit b774666331db33ea6407174e0fe6e27a73160522
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Apr 25 01:45:41 2024 +0800

    [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948)
---
 .../pulsar/broker/service/AbstractReplicator.java  |  10 +-
 .../apache/pulsar/broker/service/Replicator.java   |   2 +
 .../service/persistent/PersistentReplicator.java   |   9 +-
 .../broker/service/persistent/PersistentTopic.java |  58 +++++--
 .../broker/service/OneWayReplicatorTest.java       | 166 +++++++++++++++++++++
 .../broker/service/OneWayReplicatorTestBase.java   |  14 +-
 .../pulsar/broker/service/ReplicatorTest.java      |   2 +-
 7 files changed, 239 insertions(+), 22 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index f34144deb0a..394fad21ae6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -248,7 +248,7 @@ public abstract class AbstractReplicator implements Replicator {
                 }
                 startProducer();
             }).exceptionally(ex -> {
-                log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
+                log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
                                 + " trigger a terminate. Replicator state: {}",
                         localTopicName, replicatorId, STATE_UPDATER.get(this), ex);
                 terminate();
@@ -377,9 +377,13 @@ public abstract class AbstractReplicator implements Replicator {
             this.producer = null;
             // set the cursor as inactive.
             disableReplicatorRead();
+            // release resources.
+            doReleaseResources();
         });
     }
 
+    protected void doReleaseResources() {}
+
     protected boolean tryChangeStatusToTerminating() {
         if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){
             return true;
@@ -468,4 +472,8 @@ public abstract class AbstractReplicator implements Replicator {
         }
         return compareSetAndGetState(expect, update);
     }
+
+    public boolean isTerminated() {
+        return state == State.Terminating || state == State.Terminated;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 8130b855b4e..5c314397da8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -51,4 +51,6 @@ public interface Replicator {
     boolean isConnected();
 
     long getNumberOfEntriesInBacklog();
+
+    boolean isTerminated();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 5e1cc4a936a..367d1965207 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -450,7 +450,7 @@ public abstract class PersistentReplicator extends AbstractReplicator
         long waitTimeMillis = readFailureBackoff.next();
 
         if (exception instanceof CursorAlreadyClosedException) {
-            log.error("[{}] Error reading entries because replicator is"
+            log.warn("[{}] Error reading entries because replicator is"
                             + " already deleted and cursor is already closed {}, ({})",
                     replicatorId, ctx, exception.getMessage(), exception);
             // replicator is already deleted and cursor is already closed so, producer should also be disconnected.
@@ -570,7 +570,7 @@ public abstract class PersistentReplicator extends AbstractReplicator
         log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
                 exception.getMessage(), exception);
         if (exception instanceof CursorAlreadyClosedException) {
-            log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
+            log.warn("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
                             + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
             // replicator is already deleted and cursor is already closed so, producer should also be disconnected.
             terminate();
@@ -698,6 +698,11 @@ public abstract class PersistentReplicator extends AbstractReplicator
         return producer != null && producer.isConnected();
     }
 
+    @Override
+    protected void doReleaseResources() {
+        dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);
 
     @VisibleForTesting
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9d6855962ce..c1a75d67e3c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1731,6 +1731,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             return deleteForcefully();
         }
 
+        removeTerminatedReplicators(replicators);
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
         // Check for missing replicators
@@ -1769,6 +1770,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         if (log.isDebugEnabled()) {
             log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics);
         }
+
+        removeTerminatedReplicators(shadowReplicators);
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
         // Check for missing replicators
@@ -1919,19 +1922,30 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     if (replicationClient == null) {
                         return;
                     }
-                    Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
-                        try {
-                            return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
-                                    remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
-                        } catch (PulsarServerException e) {
-                            log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
+                    lock.readLock().lock();
+                    try {
+                        if (isClosingOrDeleting) {
+                            // Whether is "transferring" or not, do not create new replicator.
+                            log.info("[{}] Skip to create replicator because this topic is closing."
+                                    + " remote cluster: {}. State of transferring : {}",
+                                    topic, remoteCluster, transferring);
+                            return;
                         }
-                        return null;
-                    });
-
-                    // clean up replicator if startup is failed
-                    if (replicator == null) {
-                        replicators.removeNullValue(remoteCluster);
+                        Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
+                            try {
+                                return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
+                                        remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
+                            } catch (PulsarServerException e) {
+                                log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
+                            }
+                            return null;
+                        });
+                        // clean up replicator if startup is failed
+                        if (replicator == null) {
+                            replicators.removeNullValue(remoteCluster);
+                        }
+                    } finally {
+                        lock.readLock().unlock();
                     }
                 });
     }
@@ -3881,9 +3895,27 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     private void unfenceTopicToResume() {
-        subscriptions.values().forEach(sub -> sub.resumeAfterFence());
         isFenced = false;
         isClosingOrDeleting = false;
+        subscriptions.values().forEach(sub -> sub.resumeAfterFence());
+        unfenceReplicatorsToResume();
+    }
+
+    private void unfenceReplicatorsToResume() {
+        checkReplication();
+        checkShadowReplication();
+    }
+
+    private void removeTerminatedReplicators(ConcurrentOpenHashMap<String, Replicator> replicators) {
+        Map<String, Replicator> terminatedReplicators = new HashMap<>();
+        replicators.forEach((cluster, replicator) -> {
+            if (replicator.isTerminated()) {
+                terminatedReplicators.put(cluster, replicator);
+            }
+        });
+        terminatedReplicators.entrySet().forEach(entry -> {
+            replicators.remove(entry.getKey(), entry.getValue());
+        });
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 35073575f34..9b8b567af08 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -20,18 +20,21 @@ package org.apache.pulsar.broker.service;
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -48,6 +51,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
@@ -486,4 +490,166 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase {
         admin1.topics().deletePartitionedTopic(topicName);
         admin2.topics().deletePartitionedTopic(topicName);
     }
+
+    /**
+     * See the description and execution flow: https://github.com/apache/pulsar/pull/21948.
+     * Steps:
+     * 1.Create topic, does not enable replication now.
+     *   - The topic will be loaded in the memory.
+     * 2.Enable namespace level replication.
+     *   - Broker creates a replicator, and the internal producer of replicator is starting.
+     *   - We inject an error to make the internal producer fail to connect,after few seconds, it will retry to start.
+     * 3.Unload bundle.
+     *   - Starting to close the topic.
+     *   - The replicator will be closed, but it will not close the internal producer, because the producer has not
+     *     been created successfully.
+     *   - We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck. So the topic is still
+     *     in the process of being closed now.
+     * 4.Internal producer retry to connect.
+     *   - At the next retry, it connected successful. Since the state of "repl.cursor" is not "Closed", this producer
+     *     will not be closed now.
+     * 5.Topic closed.
+     *   - Cancel the stuck of closing the "repl.cursor".
+     *   - The topic is wholly closed.
+     * 6.Verify: the delayed created internal producer will be closed. In other words, there is no producer is connected
+     *   to the remote cluster.
+     */
+    @Test
+    public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception {
+        final String namespaceName = defaultTenant + "/" + UUID.randomUUID().toString().replaceAll("-", "");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp_");
+        // 1.Create topic, does not enable replication now.
+        admin1.namespaces().createNamespace(namespaceName);
+        admin2.namespaces().createNamespace(namespaceName);
+        admin1.topics().createNonPartitionedTopic(topicName);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+        // We inject an error to make the internal producer fail to connect.
+        // The delay time of next retry to create producer is below:
+        //   0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
+        //   If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
+        final AtomicInteger createProducerCounter = new AtomicInteger();
+        final int failTimes = 6;
+        injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
+            if (topicName.equals(producerCnf.getTopicName())) {
+                // There is a switch to determine create producer successfully or not.
+                if (createProducerCounter.incrementAndGet() > failTimes) {
+                    return originalProducer;
+                }
+                log.info("Retry create replicator.producer count: {}", createProducerCounter);
+                // Release producer and fail callback.
+                originalProducer.closeAsync();
+                throw new RuntimeException("mock error");
+            }
+            return originalProducer;
+        });
+
+        // 2.Enable namespace level replication.
+        admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1, cluster2));
+        AtomicReference<PersistentReplicator> replicator = new AtomicReference<PersistentReplicator>();
+        Awaitility.await().untilAsserted(() -> {
+            assertFalse(persistentTopic.getReplicators().isEmpty());
+            replicator.set(
+                    (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next());
+            // Since we inject a producer creation error, the replicator can not start successfully.
+            assertFalse(replicator.get().isConnected());
+        });
+
+        // We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck, until the internal
+        // producer of the replicator started.
+        SpyCursor spyCursor =
+                spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName());
+        CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor);
+
+        // 3.Unload bundle: call "topic.close(false)".
+        // Stuck start new producer, until the state of replicator change to Stopped.
+        // The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully.
+        Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
+            assertTrue(createProducerCounter.get() >= failTimes);
+        });
+        CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
+        Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+            String state = String.valueOf(replicator.get().getState());
+            log.error("replicator state: {}", state);
+            assertTrue(state.equals("Disconnected") || state.equals("Terminated"));
+        });
+
+        // 5.Delay close cursor, until "replicator.producer" create successfully.
+        // The next once retry time of create "replicator.producer" will be 3.2s.
+        Thread.sleep(4 * 1000);
+        log.info("Replicator.state: {}", replicator.get().getState());
+        cursorCloseSignal.startClose();
+        cursorCloseSignal.startCallback();
+        // Wait for topic close successfully.
+        topicCloseFuture.join();
+
+        // 6. Verify there is no orphan producer on the remote cluster.
+        Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
+            PersistentTopic persistentTopic2 =
+                    (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+            assertEquals(persistentTopic2.getProducers().size(), 0);
+            Assert.assertFalse(replicator.get().isConnected());
+        });
+
+        // cleanup.
+        cleanupTopics(namespaceName, () -> {
+            admin1.topics().delete(topicName);
+            admin2.topics().delete(topicName);
+        });
+        admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1));
+        admin1.namespaces().deleteNamespace(namespaceName);
+        admin2.namespaces().deleteNamespace(namespaceName);
+    }
+
+    @Test
+    public void testUnFenceTopicToReuse() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp");
+        // Wait for replicator started.
+        Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
+        waitReplicatorStarted(topicName);
+
+        // Inject an error to make topic close fails.
+        final String mockProducerName = UUID.randomUUID().toString();
+        final org.apache.pulsar.broker.service.Producer mockProducer =
+                mock(org.apache.pulsar.broker.service.Producer.class);
+        doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
+                .when(mockProducer).disconnect(any());
+        doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
+                .when(mockProducer).disconnect();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+        persistentTopic.getProducers().put(mockProducerName, mockProducer);
+
+        // Do close.
+        GeoPersistentReplicator replicator1 =
+                (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
+        try {
+            persistentTopic.close(true, false).join();
+            fail("Expected close fails due to a producer close fails");
+        } catch (Exception ex) {
+            log.info("Expected error: {}", ex.getMessage());
+        }
+
+        // Broker will call `topic.unfenceTopicToResume` if close clients fails.
+        // Verify: the replicator will be re-created.
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(producer1.isConnected());
+            GeoPersistentReplicator replicator2 =
+                    (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
+            assertNotEquals(replicator1, replicator2);
+            assertFalse(replicator1.isConnected());
+            assertFalse(replicator1.producer != null && replicator1.producer.isConnected());
+            assertTrue(replicator2.isConnected());
+            assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
+        });
+
+        // cleanup.
+        persistentTopic.getProducers().remove(mockProducerName, mockProducer);
+        producer1.close();
+        cleanupTopics(() -> {
+            admin1.topics().delete(topicName);
+            admin2.topics().delete(topicName);
+        });
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 181721e34aa..95f976f965a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -150,12 +150,16 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
     }
 
     protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception {
-        waitChangeEventsInit(replicatedNamespace);
-        admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Collections.singleton(cluster1));
-        admin1.namespaces().unload(replicatedNamespace);
+        cleanupTopics(replicatedNamespace, cleanupTopicAction);
+    }
+
+    protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAction) throws Exception {
+        waitChangeEventsInit(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1));
+        admin1.namespaces().unload(namespace);
         cleanupTopicAction.run();
-        admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1, cluster2));
-        waitChangeEventsInit(replicatedNamespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2));
+        waitChangeEventsInit(namespace);
     }
 
     protected void waitChangeEventsInit(String namespace) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index a05c3468ea1..0bfcdf563d6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -152,7 +152,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
 
-    @Test
+    @Test(priority = Integer.MAX_VALUE)
     public void testConfigChange() throws Exception {
         log.info("--- Starting ReplicatorTest::testConfigChange ---");
         // This test is to verify that the config change on global namespace is successfully applied in broker during