You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/12 03:29:38 UTC
[pulsar] 02/20: fix publish_time not set error when broker entry
metadata enable without AppendBrokerTimestampMetadataInterceptor (#11014)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 598acecdb5e1e0c31d51540c0569db6347e23ad0
Author: Aloys <lo...@gmail.com>
AuthorDate: Sun Jul 4 14:47:12 2021 +0800
fix publish_time not set error when broker entry metadata enable without AppendBrokerTimestampMetadataInterceptor (#11014)
Fixes #11013
### Motivation
fix publish_time not set error when broker entry metadata enable without AppendBrokerTimestampMetadataInterceptor
### Modifications
1. add a new method named `getEntryTimestamp` which will return the `brokerEntryTimestamp` if `BrokerEntryMetadata` is enabled or otherwise return the `publishTime`.
2. using this `entryTimestamp` for expiry checking.
(cherry picked from commit fe7cf676e59e08b34b388fce3a190133d390d61c)
---
.../persistent/PersistentMessageExpiryMonitor.java | 8 +---
.../persistent/PersistentMessageFinder.java | 8 +---
.../broker/service/persistent/PersistentTopic.java | 19 +++-------
.../org/apache/pulsar/client/impl/MessageImpl.java | 43 ++++++++--------------
.../apache/pulsar/client/impl/MessageImplTest.java | 17 ++++-----
.../pulsar/sql/presto/PulsarSplitManager.java | 12 +-----
6 files changed, 35 insertions(+), 72 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index cfed64d..c5b3403 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -73,17 +73,13 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
messageTTLInSeconds);
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
- MessageImpl<byte[]> msg = null;
try {
- msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
- return msg.isExpired(messageTTLInSeconds);
+ long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
+ return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
} finally {
entry.release();
- if (msg != null) {
- msg.recycle();
- }
}
return false;
}, this, null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index 346de2b..80a71ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -61,17 +61,13 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback
}
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
- MessageImpl<byte[]> msg = null;
try {
- msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
- return msg.publishedEarlierThan(timestamp);
+ long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
+ return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e);
} finally {
entry.release();
- if (msg != null) {
- msg.recycle();
- }
}
return false;
}, this, callback);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ca22511..62a0701 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2401,10 +2401,9 @@ public class PersistentTopic extends AbstractTopic
new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
- MessageImpl<byte[]> msg = null;
try {
- msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
- boolean expired = msg.isExpired(backlogQuotaLimitInSecond);
+ long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
+ boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
if (expired && log.isDebugEnabled()) {
log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlog"
+ "exceeded quota {}", ((ManagedLedgerImpl) ledger).getSlowestConsumer().getName(),
@@ -2416,9 +2415,6 @@ public class PersistentTopic extends AbstractTopic
future.complete(false);
} finally {
entry.release();
- if (msg != null) {
- msg.recycle();
- }
}
}
@@ -2490,16 +2486,14 @@ public class PersistentTopic extends AbstractTopic
}
public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeconds) {
- MessageImpl<byte[]> msg = null;
Entry entry = null;
boolean isOldestMessageExpired = false;
try {
entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
if (entry != null) {
- msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
- if (messageTTLInSeconds != 0) {
- isOldestMessageExpired = msg.isExpired((int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD));
- }
+ long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
+ isOldestMessageExpired = MessageImpl.isEntryExpired(
+ (int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD), entryTimestamp);
}
} catch (Exception e) {
log.warn("[{}] Error while getting the oldest message", topic, e);
@@ -2507,9 +2501,6 @@ public class PersistentTopic extends AbstractTopic
if (entry != null) {
entry.release();
}
- if (msg != null) {
- msg.recycle();
- }
}
return isOldestMessageExpired;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 7f0c113..948c67e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
@@ -268,31 +269,24 @@ public class MessageImpl<T> implements Message<T> {
return msg;
}
- public static MessageImpl<byte[]> deserializeBrokerEntryMetaDataFirst(
- ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
- @SuppressWarnings("unchecked")
- MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
-
- msg.brokerEntryMetadata =
+ public static long getEntryTimestamp( ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
+ // get broker timestamp first if BrokerEntryMetadata is enabled with AppendBrokerTimestampMetadataInterceptor
+ BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
-
- if (msg.brokerEntryMetadata != null) {
- msg.msgMetadata.clear();
- msg.payload = null;
- msg.messageId = null;
- msg.topic = null;
- msg.cnx = null;
- msg.properties = Collections.emptyMap();
- return msg;
+ if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
+ return brokerEntryMetadata.getBrokerTimestamp();
}
+ // otherwise get the publish_time
+ return Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime();
+ }
- Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
- msg.payload = headersAndPayloadWithBrokerEntryMetadata;
- msg.messageId = null;
- msg.topic = null;
- msg.cnx = null;
- msg.properties = Collections.emptyMap();
- return msg;
+ public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {
+ return messageTTLInSeconds != 0 &&
+ (System.currentTimeMillis() > entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));
+ }
+
+ public static boolean isEntryPublishedEarlierThan(long entryTimestamp, long timestamp) {
+ return entryTimestamp < timestamp;
}
public static MessageImpl<byte[]> deserializeSkipBrokerEntryMetaData(
@@ -356,11 +350,6 @@ public class MessageImpl<T> implements Message<T> {
brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
}
- public boolean publishedEarlierThan(long timestamp) {
- return brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp() ? getPublishTime() < timestamp
- : brokerEntryMetadata.getBrokerTimestamp() < timestamp;
- }
-
@Override
public byte[] getData() {
if (msgMetadata.isNullValue()) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 17b77a2..4083c34 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -450,10 +450,9 @@ public class MessageImplTest {
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
- MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
- MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
- message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
- assertTrue(message.isExpired(100));
+ long entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf);
+ assertTrue(MessageImpl.isEntryExpired(100, entryTimestamp));
+ assertEquals(entryTimestamp, 1);
// test BrokerTimestamp set.
byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
@@ -463,8 +462,9 @@ public class MessageImplTest {
.setProducerName("test")
.setSequenceId(1);
byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+ long brokerEntryTimestamp = System.currentTimeMillis();
brokerMetadata = new BrokerEntryMetadata()
- .setBrokerTimestamp(System.currentTimeMillis())
+ .setBrokerTimestamp(brokerEntryTimestamp)
.setIndex(MOCK_BATCH_SIZE - 1);
brokerMetaSize = brokerMetadata.getSerializedSize();
@@ -475,10 +475,9 @@ public class MessageImplTest {
compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
- messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
- message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
- message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
- assertFalse(message.isExpired(24 * 3600));
+ entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf);
+ assertFalse(MessageImpl.isEntryExpired(24 * 3600, entryTimestamp));
+ assertEquals(entryTimestamp, brokerEntryTimestamp);
} catch (IOException e) {
fail();
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index eca1ba87..18502a9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -442,21 +442,13 @@ public class PulsarSplitManager implements ConnectorSplitManager {
return (PositionImpl) readOnlyCursor.findNewestMatching(SearchAllAvailableEntries, new Predicate<Entry>() {
@Override
public boolean apply(Entry entry) {
-
- MessageImpl<byte[]> msg = null;
try {
- msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
- return msg.getBrokerEntryMetadata() != null
- ? msg.getBrokerEntryMetadata().getBrokerTimestamp() <= timestamp
- : msg.getPublishTime() <= timestamp;
-
+ long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
+ return entryTimestamp <= timestamp;
} catch (Exception e) {
log.error(e, "Failed To deserialize message when finding position with error: %s", e);
} finally {
entry.release();
- if (msg != null) {
- msg.recycle();
- }
}
return false;
}