You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/16 16:15:30 UTC

[pulsar] branch branch-2.8 updated (ff51a02 -> 28a223c)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from ff51a02  Fix cherry-pick issue on branch-2.8 (#11982)
     new da387c0  Force Python CI to use earlier version of Protobuf which supports Python2 (#12058)
     new 459f278  [Client] Fix endless receiveAsync loop in MultiTopicsConsumer (#12044)
     new 6d3966c  Fixed ProxyConnection to check for existence of auth_data field (#12057)
     new 28a223c  Improved logic for pausing replicated subscription snapshots when no traffic (#11922)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/service/Producer.java |  35 ++++--
 .../apache/pulsar/broker/service/ServerCnx.java    |   7 +-
 .../org/apache/pulsar/broker/service/Topic.java    |   4 +
 .../broker/service/persistent/PersistentTopic.java |  13 +++
 .../ReplicatedSubscriptionsController.java         |  41 +++----
 .../ReplicatedSubscriptionsSnapshotBuilder.java    |   4 +
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 122 +++++++++++++++++++++
 pulsar-client-cpp/run-unit-tests.sh                |   4 +
 .../client/impl/MultiTopicsConsumerImpl.java       |   5 +
 .../apache/pulsar/common/protocol/Commands.java    |   4 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   3 +
 .../pulsar/proxy/server/ProxyConnection.java       |   6 +-
 12 files changed, 206 insertions(+), 42 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java

[pulsar] 02/04: [Client] Fix endless receiveAsync loop in MultiTopicsConsumer (#12044)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 459f278c43102059aaffba35ba2ac402027ebf05
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Sep 16 09:18:05 2021 +0300

    [Client] Fix endless receiveAsync loop in MultiTopicsConsumer (#12044)
    
    Fixes #12024
---
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 122 +++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |   5 +
 2 files changed, 127 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
new file mode 100644
index 0000000..715f3ad
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.google.common.collect.Lists;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Cleanup;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.mockito.AdditionalAnswers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class MultiTopicsConsumerTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerTest.class);
+    private ScheduledExecutorService internalExecutorServiceDelegate;
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
+        ClientConfigurationData conf =
+                ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
+        return new PulsarClientImpl(conf) {
+            {
+                ScheduledExecutorService internalExecutorService =
+                        (ScheduledExecutorService) super.getInternalExecutorService();
+                internalExecutorServiceDelegate = mock(ScheduledExecutorService.class,
+                        // a spy isn't used since that doesn't work for private classes, instead
+                        // the mock delegatesTo an existing instance. A delegate is sufficient for verifying
+                        // method calls on the interface.
+                        Mockito.withSettings().defaultAnswer(AdditionalAnswers.delegatesTo(internalExecutorService)));
+            }
+            @Override
+            public ExecutorService getInternalExecutorService() {
+                return internalExecutorServiceDelegate;
+            }
+        };
+    }
+
+    // test that reproduces the issue https://github.com/apache/pulsar/issues/12024
+    // where closing the consumer leads to an endless receive loop
+    @Test
+    public void testMultiTopicsConsumerCloses() throws Exception {
+        String topicNameBase = "persistent://my-property/my-ns/my-topic-consumer-closes-";
+
+        @Cleanup
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+                .topic(topicNameBase + "1")
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        Producer<byte[]> producer2 = pulsarClient.newProducer()
+                .topic(topicNameBase + "2")
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        Producer<byte[]> producer3 = pulsarClient.newProducer()
+                .topic(topicNameBase + "3")
+                .enableBatching(false)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .topics(Lists.newArrayList(topicNameBase + "1", topicNameBase + "2", topicNameBase + "3"))
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .receiverQueueSize(1)
+                .subscriptionName(methodName)
+                .subscribe();
+
+        // wait for background tasks to start
+        Thread.sleep(1000L);
+
+        // when consumer is closed
+        consumer.close();
+        // give time for background tasks to execute
+        Thread.sleep(1000L);
+
+        // then verify that no scheduling operation has happened
+        verify(internalExecutorServiceDelegate, times(0))
+                .schedule(any(Runnable.class), anyLong(), any());
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 68bdf38..3b06ffa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -270,6 +270,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
             }
         }).exceptionally(ex -> {
+            if (ex instanceof PulsarClientException.AlreadyClosedException
+                    || ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
+                // ignore the exception that happens when the consumer is closed
+                return null;
+            }
             log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex);
             internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
             return null;

[pulsar] 01/04: Force Python CI to use earlier version of Protobuf which supports Python2 (#12058)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit da387c00f2c78492dfa24c0065516184d9dc741f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Sep 15 23:16:31 2021 -0700

    Force Python CI to use earlier version of Protobuf which supports Python2 (#12058)
---
 pulsar-client-cpp/run-unit-tests.sh | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh
index b8d6a8e..8dd8c00 100755
--- a/pulsar-client-cpp/run-unit-tests.sh
+++ b/pulsar-client-cpp/run-unit-tests.sh
@@ -64,6 +64,10 @@ if [ $RES -eq 0 ]; then
     WHEEL_FILE=$(ls dist/ | grep whl)
     echo "${WHEEL_FILE}"
     echo "dist/${WHEEL_FILE}[all]"
+    # Protobuf 3.18 only works with Python3. Since we're still using Python2 in CI, 
+    # let's pin the Python version to the previous one
+    pip install protobuf==3.17.3
+
     pip install dist/${WHEEL_FILE}[all]
 
     echo "---- Running Python unit tests"

[pulsar] 04/04: Improved logic for pausing replicated subscription snapshots when no traffic (#11922)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 28a223c50009c99d040c0a9f1f4702c75d026f38
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 16 08:28:36 2021 -0700

    Improved logic for pausing replicated subscription snapshots when no traffic (#11922)
    
    * Improved logic for pausing replicated subscription snapshots when no traffic
    
    * Removed unused import
    
    * Fixed flaky test ReplicatorTest.testRemoveClusterFromNamespace
    
    * Fixed cast that was not available in tests
---
 .../org/apache/pulsar/broker/service/Producer.java | 35 +++++++++++-------
 .../apache/pulsar/broker/service/ServerCnx.java    |  7 ++--
 .../org/apache/pulsar/broker/service/Topic.java    |  4 +++
 .../broker/service/persistent/PersistentTopic.java | 13 +++++++
 .../ReplicatedSubscriptionsController.java         | 41 +++++++++-------------
 .../ReplicatedSubscriptionsSnapshotBuilder.java    |  4 +++
 .../apache/pulsar/common/protocol/Commands.java    |  4 +++
 pulsar-common/src/main/proto/PulsarApi.proto       |  3 ++
 8 files changed, 71 insertions(+), 40 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 12697f6..d13e2f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -157,14 +157,14 @@ public class Producer {
     }
 
     public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
-            boolean isChunked) {
+            boolean isChunked, boolean isMarker) {
         if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) {
-            publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
+            publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker);
         }
     }
 
     public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
-            ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
+            ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
         if (lowestSequenceId > highestSequenceId) {
             cnx.execute(() -> {
                 cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError,
@@ -174,7 +174,8 @@ public class Producer {
             return;
         }
         if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) {
-            publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked);
+            publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked,
+                    isMarker);
         }
     }
 
@@ -219,19 +220,20 @@ public class Producer {
         return true;
     }
 
-    private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) {
+    private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
+                                       boolean isMarker) {
         topic.publishMessage(headersAndPayload,
                 MessagePublishContext.get(this, sequenceId, msgIn,
                         headersAndPayload.readableBytes(), batchSize,
-                        isChunked, System.nanoTime()));
+                        isChunked, System.nanoTime(), isMarker));
     }
 
     private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
-                                       long batchSize, boolean isChunked) {
+                                       long batchSize, boolean isChunked, boolean isMarker) {
         topic.publishMessage(headersAndPayload,
                 MessagePublishContext.get(this, lowestSequenceId,
                         highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
-                        isChunked, System.nanoTime()));
+                        isChunked, System.nanoTime(), isMarker));
     }
 
     private boolean verifyChecksum(ByteBuf headersAndPayload) {
@@ -313,6 +315,7 @@ public class Producer {
         private int msgSize;
         private long batchSize;
         private boolean chunked;
+        private boolean isMarker;
 
         private long startTimeNs;
 
@@ -437,7 +440,7 @@ public class Producer {
         }
 
         static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
-                long batchSize, boolean chunked, long startTimeNs) {
+                long batchSize, boolean chunked, long startTimeNs, boolean isMarker) {
             MessagePublishContext callback = RECYCLER.get();
             callback.producer = producer;
             callback.sequenceId = sequenceId;
@@ -448,11 +451,12 @@ public class Producer {
             callback.originalProducerName = null;
             callback.originalSequenceId = -1L;
             callback.startTimeNs = startTimeNs;
+            callback.isMarker = isMarker;
             return callback;
         }
 
         static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
-                int msgSize, long batchSize, boolean chunked, long startTimeNs) {
+                int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker) {
             MessagePublishContext callback = RECYCLER.get();
             callback.producer = producer;
             callback.sequenceId = lowestSequenceId;
@@ -464,6 +468,7 @@ public class Producer {
             callback.originalSequenceId = -1L;
             callback.startTimeNs = startTimeNs;
             callback.chunked = chunked;
+            callback.isMarker = isMarker;
             return callback;
         }
 
@@ -472,6 +477,11 @@ public class Producer {
             return batchSize;
         }
 
+        @Override
+        public boolean isMarkerMessage() {
+            return isMarker;
+        }
+
         private final Handle<MessagePublishContext> recyclerHandle;
 
         private MessagePublishContext(Handle<MessagePublishContext> recyclerHandle) {
@@ -497,6 +507,7 @@ public class Producer {
             batchSize = 0L;
             startTimeNs = -1L;
             chunked = false;
+            isMarker = false;
             recyclerHandle.recycle(this);
         }
     }
@@ -652,11 +663,11 @@ public class Producer {
     }
 
     public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
-                                  ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
+                                  ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
         checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize);
         topic.publishTxnMessage(txnID, headersAndPayload,
                 MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
-                        headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime()));
+                        headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker));
     }
 
     public SchemaVersion getSchemaVersion() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 84bba9c..340685b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1361,17 +1361,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         if (send.hasTxnidMostBits() && send.hasTxnidLeastBits()) {
             TxnID txnID = new TxnID(send.getTxnidMostBits(), send.getTxnidLeastBits());
             producer.publishTxnMessage(txnID, producer.getProducerId(), send.getSequenceId(),
-                    send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk());
+                    send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(),
+                    send.isMarker());
             return;
         }
 
         // Persist the message
         if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
             producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
-                    headersAndPayload, send.getNumMessages(), send.isIsChunk());
+                    headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
         } else {
             producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
-                    send.getNumMessages(), send.isIsChunk());
+                    send.getNumMessages(), send.isIsChunk(), send.isMarker());
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index eaed76c..2b92560 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -94,6 +94,10 @@ public interface Topic {
         default long getNumberOfMessages() {
             return  1L;
         }
+
+        default boolean isMarkerMessage() {
+            return false;
+        }
     }
 
     void publishMessage(ByteBuf headersAndPayload, PublishContext callback);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e165916..371971a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -225,6 +226,9 @@ public class PersistentTopic extends AbstractTopic
     private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
     private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
 
+    // Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
+    private long lastDataMessagePublishedTimestamp = 0;
+
     private static class TopicStatsHelper {
         public double averageMsgSize;
         public double aggMsgRateIn;
@@ -491,6 +495,11 @@ public class PersistentTopic extends AbstractTopic
 
         // Message has been successfully persisted
         messageDeduplication.recordMessagePersisted(publishContext, position);
+
+        if (!publishContext.isMarkerMessage()) {
+            lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+        }
+
         publishContext.setMetadataFromEntryData(entryData);
         publishContext.completed(null, position.getLedgerId(), position.getEntryId());
         // in order to sync the max position when cursor read entries
@@ -3242,4 +3251,8 @@ public class PersistentTopic extends AbstractTopic
         }
         return subscription.getPendingAckManageLedger();
     }
+
+    public long getLastDataMessagePublishedTimestamp() {
+        return lastDataMessagePublishedTimestamp;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index dc03962..7beeaa0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -36,7 +36,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.pulsar.broker.service.Producer;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.ClusterMessageId;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -56,9 +55,12 @@ import org.apache.pulsar.common.protocol.Markers;
 public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.PublishContext {
     private final PersistentTopic topic;
     private final String localCluster;
+
+    // The timestamp of when the last snapshot was initiated
+    private long lastCompletedSnapshotStartTime = 0;
+
     private String lastCompletedSnapshotId;
 
-    private boolean skippedSnapshotForNoProducers = false;
     private volatile Position positionOfLastLocalMarker;
 
     private final ScheduledFuture<?> timer;
@@ -192,28 +194,7 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
     private void startNewSnapshot() {
         cleanupTimedOutSnapshots();
 
-        boolean hasLocalProducer = false;
-        for (Producer p : topic.getProducers().values()) {
-            if (!p.isRemote()) {
-                hasLocalProducer = true;
-                break;
-            }
-        }
-
-        if (!hasLocalProducer) {
-            if (!skippedSnapshotForNoProducers) {
-                skippedSnapshotForNoProducers = true;
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] There are no local producers: Skipping 1 snapshot", topic.getName());
-                }
-
-                return;
-            }
-        }
-
-        skippedSnapshotForNoProducers = false;
-
-        if (topic.getLastPosition() != null && topic.getLastPosition().equals(positionOfLastLocalMarker)) {
+        if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
             // There was no message written since the last snapshot, we can skip creating a new snapshot
             if (log.isDebugEnabled()) {
                 log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
@@ -269,9 +250,13 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
     }
 
     void snapshotCompleted(String snapshotId) {
-        pendingSnapshots.remove(snapshotId);
+        ReplicatedSubscriptionsSnapshotBuilder snapshot = pendingSnapshots.remove(snapshotId);
         pendingSnapshotsMetric.dec();
         lastCompletedSnapshotId = snapshotId;
+
+        if (snapshot != null) {
+            lastCompletedSnapshotStartTime = snapshot.getStartTimeMillis();
+        }
     }
 
     void writeMarker(ByteBuf marker) {
@@ -305,6 +290,12 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
     }
 
     @Override
+    public boolean isMarkerMessage() {
+        // Everything published by this controller will be a marker a message
+        return true;
+    }
+
+    @Override
     public void close() {
         timer.cancel(true);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
index 42c6138..38fcf7b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
@@ -131,4 +131,8 @@ public class ReplicatedSubscriptionsSnapshotBuilder {
     boolean isTimedOut() {
         return (startTimeMillis + timeoutMillis) < clock.millis();
     }
+
+    long getStartTimeMillis() {
+        return startTimeMillis;
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 4484ab9..241ae87 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -504,6 +504,10 @@ public class Commands {
             send.setIsChunk(true);
         }
 
+        if (messageData.hasMarkerType()) {
+            send.setMarker(true);
+        }
+
         return serializeCommandSendWithSize(cmd, checksumType, messageData, payload);
     }
 
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index b26024f..cabd099 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -493,6 +493,9 @@ message CommandSend {
     /// Add highest sequence id to support batch message with external sequence id
     optional uint64 highest_sequence_id = 6 [default = 0];
     optional bool is_chunk     =7 [default = false];
+
+    // Specify if the message being published is a Pulsar marker or not
+    optional bool marker = 8 [default = false];
 }
 
 message CommandSendReceipt {

[pulsar] 03/04: Fixed ProxyConnection to check for existence of auth_data field (#12057)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6d3966c477379b36e38bf5aa5fff029be9221ae9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 16 08:04:29 2021 -0700

    Fixed ProxyConnection to check for existence of auth_data field (#12057)
    
    ### Motivation
    
    ProxyConnection is not checking whether the optional auth_data field is set or not.
    
    Additionally, if there's an error before the client instance is created, we should avoid calling close on it.
    
    ```
    2021-09-16T00:49:40,213 [pulsar-proxy-io-2-3] WARN  org.apache.pulsar.proxy.server.ProxyConnection - [/10.199.78.158:7165] Unable to authenticate:
    java.lang.IllegalStateException: Field 'auth_data' is not set
        at org.apache.pulsar.common.api.proto.CommandConnect.getAuthDataSlice(CommandConnect.java:90) ~[org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]
        at org.apache.pulsar.common.api.proto.CommandConnect.getAuthData(CommandConnect.java:83) ~[org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]
        at org.apache.pulsar.proxy.server.ProxyConnection.handleConnect(ProxyConnection.java:318) [org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]
        at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:166) [org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]
        at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:192) [org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1374) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1248) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1288) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
    2021-09-16T00:49:40,215 [pulsar-proxy-io-2-3] WARN  io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usua
    java.lang.NullPointerException: null
        at org.apache.pulsar.proxy.server.ProxyConnection.close(ProxyConnection.java:416) ~[org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]
        at org.apache.pulsar.proxy.server.ProxyConnection.handleConnect(ProxyConnection.java:356) ~[org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]
        at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:166) ~[org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]
        at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:192) ~[org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1374) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1248) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1288) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
    ```
---
 .../main/java/org/apache/pulsar/proxy/server/ProxyConnection.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 25f88102..28c6083 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -315,7 +315,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 return;
             }
 
-            AuthData clientData = AuthData.of(connect.getAuthData());
+            AuthData clientData = AuthData.of(connect.hasAuthData() ? connect.getAuthData() : null);
             if (connect.hasAuthMethodName()) {
                 authMethod = connect.getAuthMethodName();
             } else if (connect.hasAuthMethod()) {
@@ -413,7 +413,9 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         state = State.Closed;
         ctx.close();
         try {
-            client.close();
+            if (client != null) {
+                client.close();
+            }
         } catch (PulsarClientException e) {
             LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
         }