You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/16 16:15:34 UTC
[pulsar] 04/04: Improved logic for pausing replicated subscription
snapshots when no traffic (#11922)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 28a223c50009c99d040c0a9f1f4702c75d026f38
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 16 08:28:36 2021 -0700
Improved logic for pausing replicated subscription snapshots when no traffic (#11922)
* Improved logic for pausing replicated subscription snapshots when no traffic
* Removed unused import
* Fixed flaky test ReplicatorTest.testRemoveClusterFromNamespace
* Fixed cast that was not available in tests
---
.../org/apache/pulsar/broker/service/Producer.java | 35 +++++++++++-------
.../apache/pulsar/broker/service/ServerCnx.java | 7 ++--
.../org/apache/pulsar/broker/service/Topic.java | 4 +++
.../broker/service/persistent/PersistentTopic.java | 13 +++++++
.../ReplicatedSubscriptionsController.java | 41 +++++++++-------------
.../ReplicatedSubscriptionsSnapshotBuilder.java | 4 +++
.../apache/pulsar/common/protocol/Commands.java | 4 +++
pulsar-common/src/main/proto/PulsarApi.proto | 3 ++
8 files changed, 71 insertions(+), 40 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 12697f6..d13e2f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -157,14 +157,14 @@ public class Producer {
}
public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
- boolean isChunked) {
+ boolean isChunked, boolean isMarker) {
if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) {
- publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
+ publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker);
}
}
public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
- ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
+ ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
if (lowestSequenceId > highestSequenceId) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError,
@@ -174,7 +174,8 @@ public class Producer {
return;
}
if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) {
- publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked);
+ publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked,
+ isMarker);
}
}
@@ -219,19 +220,20 @@ public class Producer {
return true;
}
- private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) {
+ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
+ boolean isMarker) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, sequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize,
- isChunked, System.nanoTime()));
+ isChunked, System.nanoTime(), isMarker));
}
private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
- long batchSize, boolean isChunked) {
+ long batchSize, boolean isChunked, boolean isMarker) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
- isChunked, System.nanoTime()));
+ isChunked, System.nanoTime(), isMarker));
}
private boolean verifyChecksum(ByteBuf headersAndPayload) {
@@ -313,6 +315,7 @@ public class Producer {
private int msgSize;
private long batchSize;
private boolean chunked;
+ private boolean isMarker;
private long startTimeNs;
@@ -437,7 +440,7 @@ public class Producer {
}
static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
- long batchSize, boolean chunked, long startTimeNs) {
+ long batchSize, boolean chunked, long startTimeNs, boolean isMarker) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
@@ -448,11 +451,12 @@ public class Producer {
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
+ callback.isMarker = isMarker;
return callback;
}
static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
- int msgSize, long batchSize, boolean chunked, long startTimeNs) {
+ int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
@@ -464,6 +468,7 @@ public class Producer {
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.chunked = chunked;
+ callback.isMarker = isMarker;
return callback;
}
@@ -472,6 +477,11 @@ public class Producer {
return batchSize;
}
+ @Override
+ public boolean isMarkerMessage() {
+ return isMarker;
+ }
+
private final Handle<MessagePublishContext> recyclerHandle;
private MessagePublishContext(Handle<MessagePublishContext> recyclerHandle) {
@@ -497,6 +507,7 @@ public class Producer {
batchSize = 0L;
startTimeNs = -1L;
chunked = false;
+ isMarker = false;
recyclerHandle.recycle(this);
}
}
@@ -652,11 +663,11 @@ public class Producer {
}
public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
- ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
+ ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize);
topic.publishTxnMessage(txnID, headersAndPayload,
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
- headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime()));
+ headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker));
}
public SchemaVersion getSchemaVersion() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 84bba9c..340685b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1361,17 +1361,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (send.hasTxnidMostBits() && send.hasTxnidLeastBits()) {
TxnID txnID = new TxnID(send.getTxnidMostBits(), send.getTxnidLeastBits());
producer.publishTxnMessage(txnID, producer.getProducerId(), send.getSequenceId(),
- send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk());
+ send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(),
+ send.isMarker());
return;
}
// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
- headersAndPayload, send.getNumMessages(), send.isIsChunk());
+ headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
} else {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
- send.getNumMessages(), send.isIsChunk());
+ send.getNumMessages(), send.isIsChunk(), send.isMarker());
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index eaed76c..2b92560 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -94,6 +94,10 @@ public interface Topic {
default long getNumberOfMessages() {
return 1L;
}
+
+ default boolean isMarkerMessage() {
+ return false;
+ }
}
void publishMessage(ByteBuf headersAndPayload, PublishContext callback);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e165916..371971a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -225,6 +226,9 @@ public class PersistentTopic extends AbstractTopic
private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
+ // Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
+ private long lastDataMessagePublishedTimestamp = 0;
+
private static class TopicStatsHelper {
public double averageMsgSize;
public double aggMsgRateIn;
@@ -491,6 +495,11 @@ public class PersistentTopic extends AbstractTopic
// Message has been successfully persisted
messageDeduplication.recordMessagePersisted(publishContext, position);
+
+ if (!publishContext.isMarkerMessage()) {
+ lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+ }
+
publishContext.setMetadataFromEntryData(entryData);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
// in order to sync the max position when cursor read entries
@@ -3242,4 +3251,8 @@ public class PersistentTopic extends AbstractTopic
}
return subscription.getPendingAckManageLedger();
}
+
+ public long getLastDataMessagePublishedTimestamp() {
+ return lastDataMessagePublishedTimestamp;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index dc03962..7beeaa0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -36,7 +36,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.ClusterMessageId;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -56,9 +55,12 @@ import org.apache.pulsar.common.protocol.Markers;
public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.PublishContext {
private final PersistentTopic topic;
private final String localCluster;
+
+ // The timestamp of when the last snapshot was initiated
+ private long lastCompletedSnapshotStartTime = 0;
+
private String lastCompletedSnapshotId;
- private boolean skippedSnapshotForNoProducers = false;
private volatile Position positionOfLastLocalMarker;
private final ScheduledFuture<?> timer;
@@ -192,28 +194,7 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
private void startNewSnapshot() {
cleanupTimedOutSnapshots();
- boolean hasLocalProducer = false;
- for (Producer p : topic.getProducers().values()) {
- if (!p.isRemote()) {
- hasLocalProducer = true;
- break;
- }
- }
-
- if (!hasLocalProducer) {
- if (!skippedSnapshotForNoProducers) {
- skippedSnapshotForNoProducers = true;
- if (log.isDebugEnabled()) {
- log.debug("[{}] There are no local producers: Skipping 1 snapshot", topic.getName());
- }
-
- return;
- }
- }
-
- skippedSnapshotForNoProducers = false;
-
- if (topic.getLastPosition() != null && topic.getLastPosition().equals(positionOfLastLocalMarker)) {
+ if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
// There was no message written since the last snapshot, we can skip creating a new snapshot
if (log.isDebugEnabled()) {
log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
@@ -269,9 +250,13 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
}
void snapshotCompleted(String snapshotId) {
- pendingSnapshots.remove(snapshotId);
+ ReplicatedSubscriptionsSnapshotBuilder snapshot = pendingSnapshots.remove(snapshotId);
pendingSnapshotsMetric.dec();
lastCompletedSnapshotId = snapshotId;
+
+ if (snapshot != null) {
+ lastCompletedSnapshotStartTime = snapshot.getStartTimeMillis();
+ }
}
void writeMarker(ByteBuf marker) {
@@ -305,6 +290,12 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
}
@Override
+ public boolean isMarkerMessage() {
+ // Everything published by this controller will be a marker a message
+ return true;
+ }
+
+ @Override
public void close() {
timer.cancel(true);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
index 42c6138..38fcf7b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
@@ -131,4 +131,8 @@ public class ReplicatedSubscriptionsSnapshotBuilder {
boolean isTimedOut() {
return (startTimeMillis + timeoutMillis) < clock.millis();
}
+
+ long getStartTimeMillis() {
+ return startTimeMillis;
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 4484ab9..241ae87 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -504,6 +504,10 @@ public class Commands {
send.setIsChunk(true);
}
+ if (messageData.hasMarkerType()) {
+ send.setMarker(true);
+ }
+
return serializeCommandSendWithSize(cmd, checksumType, messageData, payload);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index b26024f..cabd099 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -493,6 +493,9 @@ message CommandSend {
/// Add highest sequence id to support batch message with external sequence id
optional uint64 highest_sequence_id = 6 [default = 0];
optional bool is_chunk =7 [default = false];
+
+ // Specify if the message being published is a Pulsar marker or not
+ optional bool marker = 8 [default = false];
}
message CommandSendReceipt {