You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/04 07:17:09 UTC

[GitHub] [pulsar] LeBW opened a new pull request #11553: Exposing the broker entry metadata to client

LeBW opened a new pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553


   <!--
   ### Contribution Checklist
     
     - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
       Skip *Issue XYZ* if there is no associated github issue for this pull request.
       Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ### Motivation
   This is an improvement to [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata). Now the client is able to get the broker entry metadata.
   
   ### Modifications
   1. increment the client protocol version from 17 to 18.
   2. add a configuration `enableExposingBrokerEntryMetadataToClient` in the broker configuration.
   3. Let the broker send the broker entry metadata to the client when `client protocol version >= 18` and `enableExposingBrokerEntryMetadataToClient` is true.
   4. Add client support for broker entry metadata. Add two methods `getBrokerPublishTime()` and `getIndex()` to utilize the broker entry metadata in `Message`.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   Added test for `getBrokerPublishTime()` and `getIndex()`.
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (**yes**)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (**yes**)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   #### For contributor
   
   For this PR, do we need to update docs?
   
   - If yes, please update docs or create a follow-up issue if you need help.
     
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r690832856



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       It is better to use the `FeatureFlag` mechanism instead of checking the `ProtocolVersion`. This will ensure clients can phase in with the support of this feature.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -185,6 +185,7 @@
     private boolean preciseTopicPublishRateLimitingEnable;
     private boolean encryptionRequireOnProducer;
 
+    private boolean enableExposingBrokerEntryMetadataToClient;

Review comment:
       It's better to use the `FeatureFlags` mechanism in this case, otherwise we'd have a problem with C++ client that is using the latest version of proto file but won't (yet) be able to discard the extra header.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r691230182



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       > It is better to use the `FeatureFlag` mechanism instead of checking the `ProtocolVersion`. This will ensure clients can phase in with the support of this feature.
   
   Thanks for your suggestion. Maybe it's better to check both the `FeatureFlag` and the `ProtocolVersion` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r686933760



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -663,6 +664,22 @@ public void release() {
         }
     }
 
+    @Override
+    public Optional<Long> getBrokerPublishTime() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
+            return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Long> getIndex() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-893962210


   > Great work. I left a few comments
   > 
   > This is an API change and wire protocol change, can you please advertise it on the dev@ mailing list?
   
   Thanks for your suggestion. I'll write an email to advertise it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-893269261


   @LeBW you can ping me to review the doc and look forward to your contribution, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r690962783



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       that's interesting. I wasn't aware of that mechanism




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r685177525



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -186,6 +186,7 @@
     private boolean preciseTopicPublishRateLimitingEnable;
     private boolean encryptionRequireOnProducer;
 
+    private boolean exposingBrokerEntryMetadataToClientEnabled;

Review comment:
       Why do we need to add ServerCnx? Is it able to get from the ServiceConfiguration directly?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1236,8 +1240,9 @@ private void interceptAndComplete(final Message<T> message, final CompletableFut
         completePendingReceive(receivedFuture, interceptMessage);
     }
 
-    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
-            MessageIdData messageId, ClientCnx cnx) {
+    void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,

Review comment:
       Could you please help add a test to cover the batch message case?

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -245,4 +245,22 @@
      * @since 2.8.0
      */
     void release();
+
+    /**
+     * Get broker publish time from broker entry metadata.
+     * Note that only if the feature is enabled in the broker then the value is available.
+     *
+     * @since 2.9.0
+     * @return broker publish time from broker entry metadata, or empty if the feature is not enabled in the broker.
+     */
+    Optional<Long> getBrokerPublishTime();

Review comment:
       It's better to add a method for checking if has broker publish time, such as` boolean hasBrokerPublishTime()`;

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -663,6 +664,22 @@ public void release() {
         }
     }
 
+    @Override
+    public Optional<Long> getBrokerPublishTime() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
+            return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Long> getIndex() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {

Review comment:
       For a batch message, we only have one index in the broker entry metadata, we will get the same index for the individual messages of a batch.

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -245,4 +245,22 @@
      * @since 2.8.0
      */
     void release();
+
+    /**
+     * Get broker publish time from broker entry metadata.
+     * Note that only if the feature is enabled in the broker then the value is available.
+     *
+     * @since 2.9.0
+     * @return broker publish time from broker entry metadata, or empty if the feature is not enabled in the broker.
+     */
+    Optional<Long> getBrokerPublishTime();
+
+    /**
+     * Get index from broker entry metadata.
+     * Note that only if the feature is enabled in the broker then the value is available.
+     *
+     * @since 2.9.0
+     * @return index from broker entry metadata, or empty if the feature is not enabled in the broker.
+     */
+    Optional<Long> getIndex();

Review comment:
       It better to add a method for checking if has index, such as `boolean hasIndex()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r690832856



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       It is better to use the `FeatureFlag` mechanism instead of checking the `ProtocolVersion`. This will ensure clients can phase in with the support of this feature.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -185,6 +185,7 @@
     private boolean preciseTopicPublishRateLimitingEnable;
     private boolean encryptionRequireOnProducer;
 
+    private boolean enableExposingBrokerEntryMetadataToClient;

Review comment:
       It's better to use the `FeatureFlags` mechanism in this case, otherwise we'd have a problem with C++ client that is using the latest version of proto file but won't (yet) be able to discard the extra header.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r687646847



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,12 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()
+                        || !cnx.isExposingBrokerEntryMetadataToClientEnabled()) {

Review comment:
       We can get the configuration through the following dependency paths
   PulsarCommandSenderImpl -> ServerCnx -> BrokerService -> PulsarService -> configuration
   `isExposingBrokerEntryMetadataToClientEnabled` in ServerCnx can be removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r684016670



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -245,4 +245,20 @@
      * @since 2.8.0
      */
     void release();
+
+    /**
+     * Get broker publish time from broker entry metadata.
+     *
+     * @since 2.9.0
+     * @return broker publish time from broker entry metadata
+     */
+    long getBrokerPublishTime();
+
+    /**
+     * Get index from broker entry metadata.
+     *
+     * @Since 2.9.0
+     * @return index from broker entry metadata
+     */
+    long getIndex();

Review comment:
       Thanks. The index is from [PR-9039](https://github.com/apache/pulsar/pull/9039), which provides continuous message sequence-Id for messages in one topic-partition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-893218000






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-893259638


   > @LeBW thanks for your suggestion. Would you like to add some docs from your perspective?
   
   Yeah, I can do it later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r687458847



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -273,6 +274,78 @@ public void testGetLastMessageId() throws Exception {
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscriptionName(subscription)
                 .subscribe();
-        consumer.getLastMessageId();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForIndividualMessage() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        long sendTime = System.currentTimeMillis();
+
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send(String.valueOf(i).getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            Message<byte[]> received = consumer.receive();
+            Assert.assertTrue(
+                    received.hasBrokerPublishTime() && received.getBrokerPublishTime().orElse(-1L) >= sendTime);
+            Assert.assertTrue(received.hasIndex() && received.getIndex().orElse(-1L) == i);
+        }
+        producer.close();
+        consumer.close();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForBatchMessage() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)

Review comment:
       Got it. Fixed.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -663,6 +664,36 @@ public void release() {
         }
     }
 
+    @Override
+    public boolean hasBrokerPublishTime() {
+        return brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp();
+    }
+
+    @Override
+    public Optional<Long> getBrokerPublishTime() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
+            return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public boolean hasIndex() {
+        return brokerEntryMetadata != null && brokerEntryMetadata.hasIndex();
+    }
+
+    @Override
+    public Optional<Long> getIndex() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+            if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof BatchMessageIdImpl) {
+                int batchIndex = ((BatchMessageIdImpl) messageId).getBatchIndex();
+                return Optional.of(brokerEntryMetadata.getIndex() - batchIndex);

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r684859310



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -1005,6 +1005,11 @@
             doc = "List of interceptors for entry metadata.")
     private Set<String> brokerEntryMetadataInterceptors = new HashSet<>();
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Enable or disable exposing broker entry metadata to client.")
+    private boolean enableExposingBrokerEntryMetadataToClient = false;

Review comment:
       `exposingBrokerEntryMetadataToClientEnabled` is better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-893269261


   @LeBW you can ping me to review the doc and look forward to your contribution, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r691230182



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       > It is better to use the `FeatureFlag` mechanism instead of checking the `ProtocolVersion`. This will ensure clients can phase in with the support of this feature.
   
   Thanks for your suggestion. Maybe it's better to check the `FeatureFlag` and the `ProtocolVersion` Both?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       > It is better to use the `FeatureFlag` mechanism instead of checking the `ProtocolVersion`. This will ensure clients can phase in with the support of this feature.
   
   Thanks for your suggestion. Maybe it's better to check both the `FeatureFlag` and the `ProtocolVersion` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r684015519



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -273,6 +274,32 @@ public void testGetLastMessageId() throws Exception {
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscriptionName(subscription)
                 .subscribe();
-        consumer.getLastMessageId();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadata() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        long sendTime = System.currentTimeMillis();
+        producer.send("hello".getBytes());
+
+        Message<byte[]> received = consumer.receive();
+        Assert.assertEquals("hello", new String(received.getData()));
+
+        Assert.assertTrue(received.getBrokerPublishTime() >= sendTime);
+        Assert.assertEquals(received.getIndex(), 0);

Review comment:
       Fixed. Improve it by sending 10 messages and the index should be from 0 to 9.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-893056582






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r686933272



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -245,4 +245,22 @@
      * @since 2.8.0
      */
     void release();
+
+    /**
+     * Get broker publish time from broker entry metadata.
+     * Note that only if the feature is enabled in the broker then the value is available.
+     *
+     * @since 2.9.0
+     * @return broker publish time from broker entry metadata, or empty if the feature is not enabled in the broker.
+     */
+    Optional<Long> getBrokerPublishTime();
+
+    /**
+     * Get index from broker entry metadata.
+     * Note that only if the feature is enabled in the broker then the value is available.
+     *
+     * @since 2.9.0
+     * @return index from broker entry metadata, or empty if the feature is not enabled in the broker.
+     */
+    Optional<Long> getIndex();

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r683800227



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -273,6 +274,32 @@ public void testGetLastMessageId() throws Exception {
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscriptionName(subscription)
                 .subscribe();
-        consumer.getLastMessageId();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadata() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        long sendTime = System.currentTimeMillis();
+        producer.send("hello".getBytes());
+
+        Message<byte[]> received = consumer.receive();
+        Assert.assertEquals("hello", new String(received.getData()));
+
+        Assert.assertTrue(received.getBrokerPublishTime() >= sendTime);
+        Assert.assertEquals(received.getIndex(), 0);

Review comment:
       Zero is an unfortunate value, can we test some value that we are sure it is not a typical default value?

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -245,4 +245,20 @@
      * @since 2.8.0
      */
     void release();
+
+    /**
+     * Get broker publish time from broker entry metadata.

Review comment:
       Please explain that this value is not available if the broker is older or it does not enable this feature.
   
   What about Optional<Long> ?

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -245,4 +245,20 @@
      * @since 2.8.0
      */
     void release();
+
+    /**
+     * Get broker publish time from broker entry metadata.
+     *
+     * @since 2.9.0
+     * @return broker publish time from broker entry metadata
+     */
+    long getBrokerPublishTime();
+
+    /**
+     * Get index from broker entry metadata.
+     *
+     * @Since 2.9.0
+     * @return index from broker entry metadata
+     */
+    long getIndex();

Review comment:
       The same here.
   Also, can you briefly explain what is the meaning of this index?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-893218000


   > Thanks for your contribution. For this PR, do we need to update docs?
   > 
   > (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)
   
   Thanks. "Broker Entry Metadata" is a new feature since 2.8.0, but I haven't found any doc about it for now. Maybe it's better to add some docs for it? (Not just for my PR)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-893269261


   @LeBW you can ping me to review and look forward to your contribution, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-901523001


   @LeBW thanks for your contribution! Please do not forget to add docs accordingly to allow users to know your great code changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-893258733


   @LeBW thanks for your suggestion. Would you like to add some docs from your perspective?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r691027666



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       Make sense, even if the client with `ProtocolVersion.v18` but does not support get broker entry metadata, the broker can skip the broker entry metadata to reduce the network workload.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-897539348


   @eolivelli Please help review the PR again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#issuecomment-893056582


   Thanks for your contribution. For this PR, do we need to update docs?
   
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks) 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r684015922



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -245,4 +245,20 @@
      * @since 2.8.0
      */
     void release();
+
+    /**
+     * Get broker publish time from broker entry metadata.

Review comment:
       Thanks. I've changed the return value from long to Optional<Long> and improve the doc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r691027666



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       Make sense, even if the client with `ProtocolVersion.v18` but does not support get broker entry metadata, the broker can skip the broker entry metadata to reduce the network workload.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r685620695



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -663,6 +664,22 @@ public void release() {
         }
     }
 
+    @Override
+    public Optional<Long> getBrokerPublishTime() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
+            return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Long> getIndex() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {

Review comment:
       Thank you all. Let me fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r684016670



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -245,4 +245,20 @@
      * @since 2.8.0
      */
     void release();
+
+    /**
+     * Get broker publish time from broker entry metadata.
+     *
+     * @since 2.9.0
+     * @return broker publish time from broker entry metadata
+     */
+    long getBrokerPublishTime();
+
+    /**
+     * Get index from broker entry metadata.
+     *
+     * @Since 2.9.0
+     * @return index from broker entry metadata
+     */
+    long getIndex();

Review comment:
       Thanks. The index is from [PR-9039](https://github.com/apache/pulsar/pull/11553), which provides continuous message sequence-Id for messages in one topic-partition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r687385573



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -273,6 +274,78 @@ public void testGetLastMessageId() throws Exception {
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscriptionName(subscription)
                 .subscribe();
-        consumer.getLastMessageId();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForIndividualMessage() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        long sendTime = System.currentTimeMillis();
+
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send(String.valueOf(i).getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            Message<byte[]> received = consumer.receive();
+            Assert.assertTrue(
+                    received.hasBrokerPublishTime() && received.getBrokerPublishTime().orElse(-1L) >= sendTime);
+            Assert.assertTrue(received.hasIndex() && received.getIndex().orElse(-1L) == i);
+        }
+        producer.close();
+        consumer.close();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForBatchMessage() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)

Review comment:
       The default batching delay is 1ms, it's better to increase the batching delay to such as 1min and force flush the producer, because if we are running the test on a machine which with leak resource, we might get only 1 message in a batch.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -663,6 +664,36 @@ public void release() {
         }
     }
 
+    @Override
+    public boolean hasBrokerPublishTime() {
+        return brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp();
+    }
+
+    @Override
+    public Optional<Long> getBrokerPublishTime() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
+            return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public boolean hasIndex() {
+        return brokerEntryMetadata != null && brokerEntryMetadata.hasIndex();
+    }
+
+    @Override
+    public Optional<Long> getIndex() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+            if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof BatchMessageIdImpl) {
+                int batchIndex = ((BatchMessageIdImpl) messageId).getBatchIndex();
+                return Optional.of(brokerEntryMetadata.getIndex() - batchIndex);

Review comment:
       If the index is 10 and the batch size is 5, we will get 10, 9, 8, 7, 6 for these 5 messages?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -273,6 +274,78 @@ public void testGetLastMessageId() throws Exception {
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscriptionName(subscription)
                 .subscribe();
-        consumer.getLastMessageId();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForIndividualMessage() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        long sendTime = System.currentTimeMillis();
+
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send(String.valueOf(i).getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            Message<byte[]> received = consumer.receive();
+            Assert.assertTrue(
+                    received.hasBrokerPublishTime() && received.getBrokerPublishTime().orElse(-1L) >= sendTime);
+            Assert.assertTrue(received.hasIndex() && received.getIndex().orElse(-1L) == i);
+        }
+        producer.close();
+        consumer.close();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForBatchMessage() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)

Review comment:
       I think we can send 5 messages and flush them and send 5 messages again, to flush again, make sure we got 2 batch message and 10 individual messages.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r685619101



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -663,6 +664,22 @@ public void release() {
         }
     }
 
+    @Override
+    public Optional<Long> getBrokerPublishTime() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
+            return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Long> getIndex() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {

Review comment:
       Agree with @codelipenghui ,  for a batch message, we need to caculate the  individual index for each message from the index  of Entry and batch size




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r691230182



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       > It is better to use the `FeatureFlag` mechanism instead of checking the `ProtocolVersion`. This will ensure clients can phase in with the support of this feature.
   
   Thanks for your suggestion. Maybe it's better to check the `FeatureFlag` and the `ProtocolVersion` Both?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r686933152



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -245,4 +245,22 @@
      * @since 2.8.0
      */
     void release();
+
+    /**
+     * Get broker publish time from broker entry metadata.
+     * Note that only if the feature is enabled in the broker then the value is available.
+     *
+     * @since 2.9.0
+     * @return broker publish time from broker entry metadata, or empty if the feature is not enabled in the broker.
+     */
+    Optional<Long> getBrokerPublishTime();

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r686933436



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1236,8 +1240,9 @@ private void interceptAndComplete(final Message<T> message, final CompletableFut
         completePendingReceive(receivedFuture, interceptMessage);
     }
 
-    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
-            MessageIdData messageId, ClientCnx cnx) {
+    void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r687870069



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,12 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()
+                        || !cnx.isExposingBrokerEntryMetadataToClientEnabled()) {

Review comment:
       Thanks for your suggestion. I've removed it in ServerCnx




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r685619101



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -663,6 +664,22 @@ public void release() {
         }
     }
 
+    @Override
+    public Optional<Long> getBrokerPublishTime() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
+            return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Long> getIndex() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {

Review comment:
       Agree with @codelipenghui ,  for a batch message, we need to caculate the  individual index for each message from the Entry index and batch size




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r691704068



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       > It is better to use the `FeatureFlag` mechanism instead of checking the `ProtocolVersion`. This will ensure clients can phase in with the support of this feature.
   
   PIP-70 adds a feature flag `supports_broker_entry_metadata`, so I use it to check here. Can you review it again?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r690962783



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -243,8 +243,13 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
                 // increment ref-count of data and release at the end of process:
                 // so, we can get chance to call entry.release
                 metadataAndPayload.retain();
-                // skip raw message metadata since broker timestamp only used in broker side
-                Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
+                // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
+                // features is not enabled
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()

Review comment:
       that's interesting. I wasn't aware of that mechanism




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] LeBW commented on a change in pull request #11553: Exposing the broker entry metadata to client

Posted by GitBox <gi...@apache.org>.
LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r684866006



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -1005,6 +1005,11 @@
             doc = "List of interceptors for entry metadata.")
     private Set<String> brokerEntryMetadataInterceptors = new HashSet<>();
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Enable or disable exposing broker entry metadata to client.")
+    private boolean enableExposingBrokerEntryMetadataToClient = false;

Review comment:
       Thanks. Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org