You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/11/28 15:29:15 UTC

(pulsar) branch branch-3.1 updated: [improve] [broker] Let the producer request success at the first time if the previous one is inactive (#21220)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9bc415dc433 [improve] [broker] Let the producer request success at the first time if the previous one is inactive (#21220)
9bc415dc433 is described below

commit 9bc415dc43373a6d22a4277b31d29f58237679c0
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Nov 2 13:58:31 2023 +0800

    [improve] [broker] Let the producer request success at the first time if the previous one is inactive (#21220)
    
    If a producer establishes a new connection when it is reconnecting, while the previous connection is now inactive, the initial request made on the new connection will fail.  This failure will trigger the topic of cleaning up the inactive producers.  However, upon making a second request, the producer will be able to successfully establish a connection and proceed with the operation.
    
    Make the initial request made on the new connection success.
    
    (cherry picked from commit 711b621bc2609be6fc207fedbe646d7cd14eadc2)
---
 .../pulsar/broker/service/AbstractTopic.java       |  56 +++++---
 .../pulsar/broker/service/ServerCnxTest.java       | 147 +++++++++++++++++++--
 .../broker/service/utils/ClientChannelHelper.java  |  12 ++
 3 files changed, 183 insertions(+), 32 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 31e37d0f176..90ca196792b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -703,15 +703,14 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
                             log.warn("[{}] Attempting to add producer to a terminated topic", topic);
                             throw new TopicTerminatedException("Topic was already terminated");
                         }
-                        internalAddProducer(producer);
-
-                        USAGE_COUNT_UPDATER.incrementAndGet(this);
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
-                                    USAGE_COUNT_UPDATER.get(this));
-                        }
-
-                        return CompletableFuture.completedFuture(producerEpoch);
+                        return internalAddProducer(producer).thenApply(ignore -> {
+                            USAGE_COUNT_UPDATER.incrementAndGet(this);
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
+                                        USAGE_COUNT_UPDATER.get(this));
+                            }
+                            return producerEpoch;
+                        });
                     } catch (BrokerServiceException e) {
                         return FutureUtil.failedFuture(e);
                     } finally {
@@ -957,15 +956,17 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         }
     }
 
-    protected void internalAddProducer(Producer producer) throws BrokerServiceException {
+    protected CompletableFuture<Void> internalAddProducer(Producer producer) {
         if (isProducersExceeded(producer)) {
             log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
-            throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
+            return CompletableFuture.failedFuture(
+                    new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"));
         }
 
         if (isSameAddressProducersExceeded(producer)) {
             log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
-            throw new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit");
+            return CompletableFuture.failedFuture(
+                    new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"));
         }
 
         if (log.isDebugEnabled()) {
@@ -974,31 +975,46 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
 
         Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
         if (existProducer != null) {
-            tryOverwriteOldProducer(existProducer, producer);
+            return tryOverwriteOldProducer(existProducer, producer);
         } else if (!producer.isRemote()) {
             USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
         }
+        return CompletableFuture.completedFuture(null);
     }
 
-    private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
-            throws BrokerServiceException {
+    private CompletableFuture<Void> tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) {
         if (newProducer.isSuccessorTo(oldProducer)) {
             oldProducer.close(false);
             if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
                 // Met concurrent update, throw exception here so that client can try reconnect later.
-                throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
-                        + "' replace concurrency error");
+                return CompletableFuture.failedFuture(new BrokerServiceException.NamingException("Producer with name '"
+                        + newProducer.getProducerName() + "' replace concurrency error"));
             } else {
                 handleProducerRemoved(oldProducer);
+                return CompletableFuture.completedFuture(null);
             }
         } else {
             // If a producer with the same name tries to use a new connection, async check the old connection is
             // available. The producers related the connection that not available are automatically cleaned up.
             if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
-                oldProducer.getCnx().checkConnectionLiveness();
+                return oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
+                    if (previousIsActive) {
+                        return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
+                                "Producer with name '" + newProducer.getProducerName()
+                                        + "' is already connected to topic"));
+                    } else {
+                        // If the connection of the previous producer is not active, the method
+                        // "cnx().checkConnectionLiveness()" will trigger the close for it and kick off the previous
+                        // producer. So try to add current producer again.
+                        // The recursive call will be stopped by these two case(This prevents infinite call):
+                        //   1. add current producer success.
+                        //   2. once another same name producer registered.
+                        return internalAddProducer(newProducer);
+                    }
+                });
             }
-            throw new BrokerServiceException.NamingException(
-                    "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
+            return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
+                    "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"));
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 5fd48819813..8abd6dcff8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -45,6 +45,7 @@ import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelId;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.vertx.core.impl.ConcurrentHashSet;
@@ -62,10 +63,14 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -973,7 +978,7 @@ public class ServerCnxTest {
     }
 
     @Test
-    public void testHandleProducerAfterClientChannelInactive() throws Exception {
+    public void testDuplicateProducer() throws Exception {
         final String tName = successTopicName;
         final long producerId = 1;
         final MutableInt requestId = new MutableInt(1);
@@ -993,33 +998,131 @@ public class ServerCnxTest {
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
 
-        // Verify the second producer using a new connection will override the producer who using a stopped channel.
-        channelsStoppedAnswerHealthCheck.add(channel);
+        // Verify the second producer will be reject due to the previous one still is active.
+        // Every second try once, total 10 times, all requests should fail.
         ClientChannel channel2 = new ClientChannel();
+        BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel);
+        BackGroundExecutor autoResponseForHeartBeat = autoResponseForHeartBeat(channel, clientChannelHelper);
+        BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel);
         setChannelConnected(channel2.serverCnx);
-        Awaitility.await().untilAsserted(() -> {
+
+        for (int i = 0; i < 10; i++) {
             ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
                     pName, false, metadata, null, epoch.incrementAndGet(), false,
                     ProducerAccessMode.Shared, Optional.empty(), false);
             channel2.channel.writeInbound(cmdProducer2);
-            assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandProducerSuccess);
+            Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
+            assertTrue(response2 instanceof CommandError);
             assertEquals(topicRef.getProducers().size(), 1);
-        });
+            assertTrue(channel.isActive());
+            Thread.sleep(500);
+        }
 
         // cleanup.
+        autoResponseForHeartBeat.close();
+        backGroundExecutor1.close();
+        backGroundExecutor2.close();
         channel.finish();
         channel2.close();
     }
 
+    @Test
+    public void testProducerChangeSocket() throws Exception {
+        final String tName = successTopicName;
+        final long producerId = 1;
+        final MutableInt requestId = new MutableInt(1);
+        final MutableInt epoch = new MutableInt(1);
+        final Map<String, String> metadata = Collections.emptyMap();
+        final String pName = "p1";
+        resetChannel();
+        setChannelConnected();
+
+        // The producer register using the first connection.
+        ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        channel.writeInbound(cmdProducer1);
+        assertTrue(getResponse() instanceof CommandProducerSuccess);
+        PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getProducers().size(), 1);
+
+        // Verify the second producer using a new connection will override the producer who using a stopped channel.
+        channelsStoppedAnswerHealthCheck.add(channel);
+        ClientChannel channel2 = new ClientChannel();
+        BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel);
+        BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel);
+        setChannelConnected(channel2.serverCnx);
+
+        ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        channel2.channel.writeInbound(cmdProducer2);
+        Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
+        assertTrue(response2 instanceof CommandProducerSuccess);
+        assertEquals(topicRef.getProducers().size(), 1);
+
+        // cleanup.
+        channelsStoppedAnswerHealthCheck.clear();
+        backGroundExecutor1.close();
+        backGroundExecutor2.close();
+        channel.finish();
+        channel2.close();
+    }
+
+    /**
+     * When a channel typed "EmbeddedChannel", once we call channel.execute(runnable), there is no background thread
+     * to run it.
+     * So starting a background thread to trigger the tasks in the queue.
+     */
+    private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final EmbeddedChannel channel) {
+        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+        ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() -> {
+            channel.runPendingTasks();
+        }, 100, 100, TimeUnit.MILLISECONDS);
+        return new BackGroundExecutor(executor, scheduledFuture);
+    }
+
+    /**
+     * Auto answer `Pong` for the `Cmd-Ping`.
+     * Node: This will result in additional threads pop Command from the Command queue, so do not call this
+     * method if the channel needs to accept other Command.
+     */
+    private BackGroundExecutor autoResponseForHeartBeat(EmbeddedChannel channel,
+                                                        ClientChannelHelper clientChannelHelper) {
+        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+        ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() -> {
+            tryPeekResponse(channel, clientChannelHelper);
+        }, 100, 100, TimeUnit.MILLISECONDS);
+        return new BackGroundExecutor(executor, scheduledFuture);
+    }
+
+    @AllArgsConstructor
+    private static class BackGroundExecutor implements Closeable {
+
+        private ScheduledExecutorService executor;
+
+        private ScheduledFuture scheduledFuture;
+
+        @Override
+        public void close() throws IOException {
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(true);
+            }
+            executor.shutdown();
+        }
+    }
+
     private class ClientChannel implements Closeable {
         private ClientChannelHelper clientChannelHelper = new ClientChannelHelper();
         private ServerCnx serverCnx = new ServerCnx(pulsar);
-        private EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
-                5 * 1024 * 1024,
-                0,
-                4,
-                0,
-                4),
+        private EmbeddedChannel channel = new EmbeddedChannel(DefaultChannelId.newInstance(),
+                new LengthFieldBasedFrameDecoder(
+                        5 * 1024 * 1024,
+                        0,
+                        4,
+                        0,
+                        4),
                 serverCnx);
         public ClientChannel() {
             serverCnx.setAuthRole("");
@@ -2694,6 +2797,26 @@ public class ServerCnxTest {
         throw new IOException("Failed to get response from socket within 10s");
     }
 
+    protected Object tryPeekResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) {
+        while (true) {
+            if (channel.outboundMessages().isEmpty()) {
+                return null;
+            } else {
+                Object outObject = channel.outboundMessages().peek();
+                Object cmd = clientChannelHelper.getCommand(outObject);
+                if (cmd instanceof CommandPing) {
+                    if (channelsStoppedAnswerHealthCheck.contains(channel)) {
+                        continue;
+                    }
+                    channel.writeInbound(Commands.newPong());
+                    channel.outboundMessages().remove();
+                    continue;
+                }
+                return cmd;
+            }
+        }
+    }
+
     private void setupMLAsyncCallbackMocks() {
         ledgerMock = mock(ManagedLedger.class);
         cursorMock = mock(ManagedCursor.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index bf0dd3aa9c1..c8fce32efc5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -27,6 +27,8 @@ import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
 import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
+import org.apache.pulsar.common.api.proto.CommandPing;
+import org.apache.pulsar.common.api.proto.CommandPong;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.CommandAck;
@@ -207,6 +209,16 @@ public class ClientChannelHelper {
                 CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) {
             queue.offer(new CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
         }
+
+        @Override
+        protected void handlePing(CommandPing ping) {
+            queue.offer(new CommandPing().copyFrom(ping));
+        }
+
+        @Override
+        protected void handlePong(CommandPong pong) {
+            return;
+        }
     };
 
 }