You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/22 08:17:58 UTC

[GitHub] [pulsar] aloyszhang opened a new issue #11013: Field 'publish_time' is not set

aloyszhang opened a new issue #11013:
URL: https://github.com/apache/pulsar/issues/11013


   **Describe the bug**
   
   KoP deserialize error after  upgrade from 2.7.2 to 2.8.0.
   Similiar to https://github.com/apache/pulsar/issues/9467
   
   ```shell
   [BookKeeperClientWorker-OrderedExecutor-0-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor - [persistent://public/__kafka/__consumer_offsets-partition-48][reader-f267e09ef1] Error deserializing message for expiry check
   java.lang.IllegalStateException: Field 'publish_time' is not set
    at org.apache.pulsar.common.api.proto.MessageMetadata.getPublishTime(MessageMetadata.java:76) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
    at org.apache.pulsar.client.impl.MessageImpl.getPublishTime(MessageImpl.java:335) ~[org.apache.pulsar-pulsar-client-original-2.8.0.jar:2.8.0]
    at org.apache.pulsar.client.impl.MessageImpl.isExpired(MessageImpl.java:349) ~[org.apache.pulsar-pulsar-client-original-2.8.0.jar:2.8.0]
    at org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor.lambda$expireMessages$0(PersistentMessageExpiryMonitor.java:79) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
    at org.apache.bookkeeper.mledger.impl.OpFindNewest.readEntryComplete(OpFindNewest.java:89) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
    at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:229) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) [?:1.8.0_221]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) [?:1.8.0_221]
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_221]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_221]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_221]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower closed issue #11013: Field 'publish_time' is not set

Posted by GitBox <gi...@apache.org>.
BewareMyPower closed issue #11013:
URL: https://github.com/apache/pulsar/issues/11013


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on issue #11013: Field 'publish_time' is not set

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on issue #11013:
URL: https://github.com/apache/pulsar/issues/11013#issuecomment-867684484


   @BewareMyPower @hangc0276 
   
   This is problem happends when broker entry metadata is enabled without `AppendBrokerTimestampMetadataInterceptor`.
   
   Let's see the logic of expire message
   ```java
     try {
                       msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
                       return msg.isExpired(messageTTLInSeconds);
                   } catch (Exception e) {
                       log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
                   }
   ```
    we will get an entry and get the `brokerEntryTimestamp` or the `publishTime` to compare with the message-ttl.
   
   And if broker entry metadata is enabled without `AppendBrokerTimestampMetadataInterceptor`.
   
   After`MessageImpl.deserializeBrokerEntryMetaDataFirst()`
   ```java
    msg.brokerEntryMetadata =
                   Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
           if (msg.brokerEntryMetadata != null ) {
               msg.msgMetadata.clear();
               msg.payload = null;
               msg.messageId = null;
               msg.topic = null;
               msg.cnx = null;
               msg.properties = Collections.emptyMap();
               return msg;
           }
   ```
   we will get a `MessageImpl`, this  `MessageImpl` has brokerEntryMetadata but does not have `brokerTimestamp` and this  `MessageImpl` has no `msgMetadata` since this field has been cleared which mean we can't get the `publish_time`. 
   
   At last, when checking expiry, 
   ```java
    public boolean isExpired(int messageTTLInSeconds) {
           return messageTTLInSeconds != 0 && (brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp()
                   ? (System.currentTimeMillis() >
                       getPublishTime() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds))
                   : (System.currentTimeMillis() >
                       brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
       }
   ```
   broker will throw `java.lang.IllegalStateException: Field 'publish_time' is not set`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org