You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/12 03:29:38 UTC

[pulsar] 02/20: fix publish_time not set error when broker entry metadata enable without AppendBrokerTimestampMetadataInterceptor (#11014)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 598acecdb5e1e0c31d51540c0569db6347e23ad0
Author: Aloys <lo...@gmail.com>
AuthorDate: Sun Jul 4 14:47:12 2021 +0800

    fix publish_time not set error when broker entry metadata enable without AppendBrokerTimestampMetadataInterceptor (#11014)
    
    Fixes #11013
    
    ### Motivation
    
    fix publish_time not set error when broker entry metadata enable without AppendBrokerTimestampMetadataInterceptor
    
    ### Modifications
    
    1. add a new method named `getEntryTimestamp` which will return the `brokerEntryTimestamp` if `BrokerEntryMetadata` is enabled or otherwise return the `publishTime`.
    2.  using this `entryTimestamp` for expiry checking.
    
    (cherry picked from commit fe7cf676e59e08b34b388fce3a190133d390d61c)
---
 .../persistent/PersistentMessageExpiryMonitor.java |  8 +---
 .../persistent/PersistentMessageFinder.java        |  8 +---
 .../broker/service/persistent/PersistentTopic.java | 19 +++-------
 .../org/apache/pulsar/client/impl/MessageImpl.java | 43 ++++++++--------------
 .../apache/pulsar/client/impl/MessageImplTest.java | 17 ++++-----
 .../pulsar/sql/presto/PulsarSplitManager.java      | 12 +-----
 6 files changed, 35 insertions(+), 72 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index cfed64d..c5b3403 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -73,17 +73,13 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
                     messageTTLInSeconds);
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
-                MessageImpl<byte[]> msg = null;
                 try {
-                    msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
-                    return msg.isExpired(messageTTLInSeconds);
+                    long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
+                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
                 } catch (Exception e) {
                     log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
                 } finally {
                     entry.release();
-                    if (msg != null) {
-                        msg.recycle();
-                    }
                 }
                 return false;
             }, this, null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index 346de2b..80a71ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -61,17 +61,13 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback
             }
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
-                MessageImpl<byte[]> msg = null;
                 try {
-                    msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
-                    return msg.publishedEarlierThan(timestamp);
+                    long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
+                    return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
                 } catch (Exception e) {
                     log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e);
                 } finally {
                     entry.release();
-                    if (msg != null) {
-                        msg.recycle();
-                    }
                 }
                 return false;
             }, this, callback);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ca22511..62a0701 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2401,10 +2401,9 @@ public class PersistentTopic extends AbstractTopic
                     new AsyncCallbacks.ReadEntryCallback() {
                         @Override
                         public void readEntryComplete(Entry entry, Object ctx) {
-                            MessageImpl<byte[]> msg = null;
                             try {
-                                msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
-                                boolean expired = msg.isExpired(backlogQuotaLimitInSecond);
+                                long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
+                                boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
                                 if (expired && log.isDebugEnabled()) {
                                     log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlog"
                                     + "exceeded quota {}", ((ManagedLedgerImpl) ledger).getSlowestConsumer().getName(),
@@ -2416,9 +2415,6 @@ public class PersistentTopic extends AbstractTopic
                                 future.complete(false);
                             } finally {
                                 entry.release();
-                                if (msg != null) {
-                                    msg.recycle();
-                                }
                             }
                         }
 
@@ -2490,16 +2486,14 @@ public class PersistentTopic extends AbstractTopic
     }
 
     public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeconds) {
-        MessageImpl<byte[]> msg = null;
         Entry entry = null;
         boolean isOldestMessageExpired = false;
         try {
             entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
             if (entry != null) {
-                msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
-                if (messageTTLInSeconds != 0) {
-                    isOldestMessageExpired = msg.isExpired((int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD));
-                }
+                long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
+                isOldestMessageExpired = MessageImpl.isEntryExpired(
+                        (int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD), entryTimestamp);
             }
         } catch (Exception e) {
             log.warn("[{}] Error while getting the oldest message", topic, e);
@@ -2507,9 +2501,6 @@ public class PersistentTopic extends AbstractTopic
             if (entry != null) {
                 entry.release();
             }
-            if (msg != null) {
-                msg.recycle();
-            }
         }
 
         return isOldestMessageExpired;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 7f0c113..948c67e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
@@ -268,31 +269,24 @@ public class MessageImpl<T> implements Message<T> {
         return msg;
     }
 
-    public static MessageImpl<byte[]> deserializeBrokerEntryMetaDataFirst(
-            ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
-        @SuppressWarnings("unchecked")
-        MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
-
-        msg.brokerEntryMetadata =
+    public static long getEntryTimestamp( ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
+        // get broker timestamp first if BrokerEntryMetadata is enabled with AppendBrokerTimestampMetadataInterceptor
+        BrokerEntryMetadata brokerEntryMetadata =
                 Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
-
-        if (msg.brokerEntryMetadata != null) {
-            msg.msgMetadata.clear();
-            msg.payload = null;
-            msg.messageId = null;
-            msg.topic = null;
-            msg.cnx = null;
-            msg.properties = Collections.emptyMap();
-            return msg;
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
+            return brokerEntryMetadata.getBrokerTimestamp();
         }
+        // otherwise get the publish_time
+        return Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime();
+    }
 
-        Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
-        msg.payload = headersAndPayloadWithBrokerEntryMetadata;
-        msg.messageId = null;
-        msg.topic = null;
-        msg.cnx = null;
-        msg.properties = Collections.emptyMap();
-        return msg;
+    public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {
+        return messageTTLInSeconds != 0 &&
+                (System.currentTimeMillis() > entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));
+    }
+
+    public static boolean isEntryPublishedEarlierThan(long entryTimestamp, long timestamp) {
+        return entryTimestamp < timestamp;
     }
 
     public static MessageImpl<byte[]> deserializeSkipBrokerEntryMetaData(
@@ -356,11 +350,6 @@ public class MessageImpl<T> implements Message<T> {
                     brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
     }
 
-    public boolean publishedEarlierThan(long timestamp) {
-        return brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp() ? getPublishTime() < timestamp
-                : brokerEntryMetadata.getBrokerTimestamp() < timestamp;
-    }
-
     @Override
     public byte[] getData() {
         if (msgMetadata.isNullValue()) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 17b77a2..4083c34 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -450,10 +450,9 @@ public class MessageImplTest {
 
             CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
             compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
-            MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
-            MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
-            message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
-            assertTrue(message.isExpired(100));
+            long entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf);
+            assertTrue(MessageImpl.isEntryExpired(100, entryTimestamp));
+            assertEquals(entryTimestamp, 1);
 
             // test BrokerTimestamp set.
             byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
@@ -463,8 +462,9 @@ public class MessageImplTest {
                     .setProducerName("test")
                     .setSequenceId(1);
             byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+            long brokerEntryTimestamp = System.currentTimeMillis();
             brokerMetadata = new BrokerEntryMetadata()
-                    .setBrokerTimestamp(System.currentTimeMillis())
+                    .setBrokerTimestamp(brokerEntryTimestamp)
                     .setIndex(MOCK_BATCH_SIZE - 1);
 
             brokerMetaSize = brokerMetadata.getSerializedSize();
@@ -475,10 +475,9 @@ public class MessageImplTest {
 
             compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
             compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
-            messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
-            message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
-            message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
-            assertFalse(message.isExpired(24 * 3600));
+            entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf);
+            assertFalse(MessageImpl.isEntryExpired(24 * 3600, entryTimestamp));
+            assertEquals(entryTimestamp, brokerEntryTimestamp);
         } catch (IOException e) {
             fail();
         }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index eca1ba87..18502a9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -442,21 +442,13 @@ public class PulsarSplitManager implements ConnectorSplitManager {
         return (PositionImpl) readOnlyCursor.findNewestMatching(SearchAllAvailableEntries, new Predicate<Entry>() {
             @Override
             public boolean apply(Entry entry) {
-
-                MessageImpl<byte[]> msg = null;
                 try {
-                    msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
-                    return msg.getBrokerEntryMetadata() != null
-                            ? msg.getBrokerEntryMetadata().getBrokerTimestamp() <= timestamp
-                            : msg.getPublishTime() <= timestamp;
-
+                    long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
+                    return entryTimestamp <= timestamp;
                 } catch (Exception e) {
                     log.error(e, "Failed To deserialize message when finding position with error: %s", e);
                 } finally {
                     entry.release();
-                    if (msg != null) {
-                        msg.recycle();
-                    }
                 }
                 return false;
             }