You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/16 16:15:34 UTC

[pulsar] 04/04: Improved logic for pausing replicated subscription snapshots when no traffic (#11922)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 28a223c50009c99d040c0a9f1f4702c75d026f38
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 16 08:28:36 2021 -0700

    Improved logic for pausing replicated subscription snapshots when no traffic (#11922)
    
    * Improved logic for pausing replicated subscription snapshots when no traffic
    
    * Removed unused import
    
    * Fixed flaky test ReplicatorTest.testRemoveClusterFromNamespace
    
    * Fixed cast that was not available in tests
---
 .../org/apache/pulsar/broker/service/Producer.java | 35 +++++++++++-------
 .../apache/pulsar/broker/service/ServerCnx.java    |  7 ++--
 .../org/apache/pulsar/broker/service/Topic.java    |  4 +++
 .../broker/service/persistent/PersistentTopic.java | 13 +++++++
 .../ReplicatedSubscriptionsController.java         | 41 +++++++++-------------
 .../ReplicatedSubscriptionsSnapshotBuilder.java    |  4 +++
 .../apache/pulsar/common/protocol/Commands.java    |  4 +++
 pulsar-common/src/main/proto/PulsarApi.proto       |  3 ++
 8 files changed, 71 insertions(+), 40 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 12697f6..d13e2f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -157,14 +157,14 @@ public class Producer {
     }
 
     public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
-            boolean isChunked) {
+            boolean isChunked, boolean isMarker) {
         if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) {
-            publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
+            publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker);
         }
     }
 
     public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
-            ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
+            ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
         if (lowestSequenceId > highestSequenceId) {
             cnx.execute(() -> {
                 cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError,
@@ -174,7 +174,8 @@ public class Producer {
             return;
         }
         if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) {
-            publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked);
+            publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked,
+                    isMarker);
         }
     }
 
@@ -219,19 +220,20 @@ public class Producer {
         return true;
     }
 
-    private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) {
+    private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
+                                       boolean isMarker) {
         topic.publishMessage(headersAndPayload,
                 MessagePublishContext.get(this, sequenceId, msgIn,
                         headersAndPayload.readableBytes(), batchSize,
-                        isChunked, System.nanoTime()));
+                        isChunked, System.nanoTime(), isMarker));
     }
 
     private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
-                                       long batchSize, boolean isChunked) {
+                                       long batchSize, boolean isChunked, boolean isMarker) {
         topic.publishMessage(headersAndPayload,
                 MessagePublishContext.get(this, lowestSequenceId,
                         highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
-                        isChunked, System.nanoTime()));
+                        isChunked, System.nanoTime(), isMarker));
     }
 
     private boolean verifyChecksum(ByteBuf headersAndPayload) {
@@ -313,6 +315,7 @@ public class Producer {
         private int msgSize;
         private long batchSize;
         private boolean chunked;
+        private boolean isMarker;
 
         private long startTimeNs;
 
@@ -437,7 +440,7 @@ public class Producer {
         }
 
         static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
-                long batchSize, boolean chunked, long startTimeNs) {
+                long batchSize, boolean chunked, long startTimeNs, boolean isMarker) {
             MessagePublishContext callback = RECYCLER.get();
             callback.producer = producer;
             callback.sequenceId = sequenceId;
@@ -448,11 +451,12 @@ public class Producer {
             callback.originalProducerName = null;
             callback.originalSequenceId = -1L;
             callback.startTimeNs = startTimeNs;
+            callback.isMarker = isMarker;
             return callback;
         }
 
         static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
-                int msgSize, long batchSize, boolean chunked, long startTimeNs) {
+                int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker) {
             MessagePublishContext callback = RECYCLER.get();
             callback.producer = producer;
             callback.sequenceId = lowestSequenceId;
@@ -464,6 +468,7 @@ public class Producer {
             callback.originalSequenceId = -1L;
             callback.startTimeNs = startTimeNs;
             callback.chunked = chunked;
+            callback.isMarker = isMarker;
             return callback;
         }
 
@@ -472,6 +477,11 @@ public class Producer {
             return batchSize;
         }
 
+        @Override
+        public boolean isMarkerMessage() {
+            return isMarker;
+        }
+
         private final Handle<MessagePublishContext> recyclerHandle;
 
         private MessagePublishContext(Handle<MessagePublishContext> recyclerHandle) {
@@ -497,6 +507,7 @@ public class Producer {
             batchSize = 0L;
             startTimeNs = -1L;
             chunked = false;
+            isMarker = false;
             recyclerHandle.recycle(this);
         }
     }
@@ -652,11 +663,11 @@ public class Producer {
     }
 
     public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
-                                  ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
+                                  ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
         checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize);
         topic.publishTxnMessage(txnID, headersAndPayload,
                 MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
-                        headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime()));
+                        headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker));
     }
 
     public SchemaVersion getSchemaVersion() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 84bba9c..340685b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1361,17 +1361,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         if (send.hasTxnidMostBits() && send.hasTxnidLeastBits()) {
             TxnID txnID = new TxnID(send.getTxnidMostBits(), send.getTxnidLeastBits());
             producer.publishTxnMessage(txnID, producer.getProducerId(), send.getSequenceId(),
-                    send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk());
+                    send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(),
+                    send.isMarker());
             return;
         }
 
         // Persist the message
         if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
             producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
-                    headersAndPayload, send.getNumMessages(), send.isIsChunk());
+                    headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
         } else {
             producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
-                    send.getNumMessages(), send.isIsChunk());
+                    send.getNumMessages(), send.isIsChunk(), send.isMarker());
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index eaed76c..2b92560 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -94,6 +94,10 @@ public interface Topic {
         default long getNumberOfMessages() {
             return  1L;
         }
+
+        default boolean isMarkerMessage() {
+            return false;
+        }
     }
 
     void publishMessage(ByteBuf headersAndPayload, PublishContext callback);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e165916..371971a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -225,6 +226,9 @@ public class PersistentTopic extends AbstractTopic
     private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
     private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
 
+    // Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
+    private long lastDataMessagePublishedTimestamp = 0;
+
     private static class TopicStatsHelper {
         public double averageMsgSize;
         public double aggMsgRateIn;
@@ -491,6 +495,11 @@ public class PersistentTopic extends AbstractTopic
 
         // Message has been successfully persisted
         messageDeduplication.recordMessagePersisted(publishContext, position);
+
+        if (!publishContext.isMarkerMessage()) {
+            lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+        }
+
         publishContext.setMetadataFromEntryData(entryData);
         publishContext.completed(null, position.getLedgerId(), position.getEntryId());
         // in order to sync the max position when cursor read entries
@@ -3242,4 +3251,8 @@ public class PersistentTopic extends AbstractTopic
         }
         return subscription.getPendingAckManageLedger();
     }
+
+    public long getLastDataMessagePublishedTimestamp() {
+        return lastDataMessagePublishedTimestamp;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index dc03962..7beeaa0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -36,7 +36,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.pulsar.broker.service.Producer;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.ClusterMessageId;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -56,9 +55,12 @@ import org.apache.pulsar.common.protocol.Markers;
 public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.PublishContext {
     private final PersistentTopic topic;
     private final String localCluster;
+
+    // The timestamp of when the last snapshot was initiated
+    private long lastCompletedSnapshotStartTime = 0;
+
     private String lastCompletedSnapshotId;
 
-    private boolean skippedSnapshotForNoProducers = false;
     private volatile Position positionOfLastLocalMarker;
 
     private final ScheduledFuture<?> timer;
@@ -192,28 +194,7 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
     private void startNewSnapshot() {
         cleanupTimedOutSnapshots();
 
-        boolean hasLocalProducer = false;
-        for (Producer p : topic.getProducers().values()) {
-            if (!p.isRemote()) {
-                hasLocalProducer = true;
-                break;
-            }
-        }
-
-        if (!hasLocalProducer) {
-            if (!skippedSnapshotForNoProducers) {
-                skippedSnapshotForNoProducers = true;
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] There are no local producers: Skipping 1 snapshot", topic.getName());
-                }
-
-                return;
-            }
-        }
-
-        skippedSnapshotForNoProducers = false;
-
-        if (topic.getLastPosition() != null && topic.getLastPosition().equals(positionOfLastLocalMarker)) {
+        if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
             // There was no message written since the last snapshot, we can skip creating a new snapshot
             if (log.isDebugEnabled()) {
                 log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
@@ -269,9 +250,13 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
     }
 
     void snapshotCompleted(String snapshotId) {
-        pendingSnapshots.remove(snapshotId);
+        ReplicatedSubscriptionsSnapshotBuilder snapshot = pendingSnapshots.remove(snapshotId);
         pendingSnapshotsMetric.dec();
         lastCompletedSnapshotId = snapshotId;
+
+        if (snapshot != null) {
+            lastCompletedSnapshotStartTime = snapshot.getStartTimeMillis();
+        }
     }
 
     void writeMarker(ByteBuf marker) {
@@ -305,6 +290,12 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
     }
 
     @Override
+    public boolean isMarkerMessage() {
+        // Everything published by this controller will be a marker a message
+        return true;
+    }
+
+    @Override
     public void close() {
         timer.cancel(true);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
index 42c6138..38fcf7b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
@@ -131,4 +131,8 @@ public class ReplicatedSubscriptionsSnapshotBuilder {
     boolean isTimedOut() {
         return (startTimeMillis + timeoutMillis) < clock.millis();
     }
+
+    long getStartTimeMillis() {
+        return startTimeMillis;
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 4484ab9..241ae87 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -504,6 +504,10 @@ public class Commands {
             send.setIsChunk(true);
         }
 
+        if (messageData.hasMarkerType()) {
+            send.setMarker(true);
+        }
+
         return serializeCommandSendWithSize(cmd, checksumType, messageData, payload);
     }
 
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index b26024f..cabd099 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -493,6 +493,9 @@ message CommandSend {
     /// Add highest sequence id to support batch message with external sequence id
     optional uint64 highest_sequence_id = 6 [default = 0];
     optional bool is_chunk     =7 [default = false];
+
+    // Specify if the message being published is a Pulsar marker or not
+    optional bool marker = 8 [default = false];
 }
 
 message CommandSendReceipt {