You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/22 08:17:58 UTC
[GitHub] [pulsar] aloyszhang opened a new issue #11013: Field 'publish_time' is not set
aloyszhang opened a new issue #11013:
URL: https://github.com/apache/pulsar/issues/11013
**Describe the bug**
KoP deserialize error after upgrade from 2.7.2 to 2.8.0.
Similiar to https://github.com/apache/pulsar/issues/9467
```shell
[BookKeeperClientWorker-OrderedExecutor-0-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor - [persistent://public/__kafka/__consumer_offsets-partition-48][reader-f267e09ef1] Error deserializing message for expiry check
java.lang.IllegalStateException: Field 'publish_time' is not set
at org.apache.pulsar.common.api.proto.MessageMetadata.getPublishTime(MessageMetadata.java:76) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.client.impl.MessageImpl.getPublishTime(MessageImpl.java:335) ~[org.apache.pulsar-pulsar-client-original-2.8.0.jar:2.8.0]
at org.apache.pulsar.client.impl.MessageImpl.isExpired(MessageImpl.java:349) ~[org.apache.pulsar-pulsar-client-original-2.8.0.jar:2.8.0]
at org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor.lambda$expireMessages$0(PersistentMessageExpiryMonitor.java:79) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.mledger.impl.OpFindNewest.readEntryComplete(OpFindNewest.java:89) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:229) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) [?:1.8.0_221]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) [?:1.8.0_221]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_221]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_221]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_221]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower closed issue #11013: Field 'publish_time' is not set
Posted by GitBox <gi...@apache.org>.
BewareMyPower closed issue #11013:
URL: https://github.com/apache/pulsar/issues/11013
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] aloyszhang commented on issue #11013: Field 'publish_time' is not set
Posted by GitBox <gi...@apache.org>.
aloyszhang commented on issue #11013:
URL: https://github.com/apache/pulsar/issues/11013#issuecomment-867684484
@BewareMyPower @hangc0276
This is problem happends when broker entry metadata is enabled without `AppendBrokerTimestampMetadataInterceptor`.
Let's see the logic of expire message
```java
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.isExpired(messageTTLInSeconds);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
}
```
we will get an entry and get the `brokerEntryTimestamp` or the `publishTime` to compare with the message-ttl.
And if broker entry metadata is enabled without `AppendBrokerTimestampMetadataInterceptor`.
After`MessageImpl.deserializeBrokerEntryMetaDataFirst()`
```java
msg.brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
if (msg.brokerEntryMetadata != null ) {
msg.msgMetadata.clear();
msg.payload = null;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.properties = Collections.emptyMap();
return msg;
}
```
we will get a `MessageImpl`, this `MessageImpl` has brokerEntryMetadata but does not have `brokerTimestamp` and this `MessageImpl` has no `msgMetadata` since this field has been cleared which mean we can't get the `publish_time`.
At last, when checking expiry,
```java
public boolean isExpired(int messageTTLInSeconds) {
return messageTTLInSeconds != 0 && (brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp()
? (System.currentTimeMillis() >
getPublishTime() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds))
: (System.currentTimeMillis() >
brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
}
```
broker will throw `java.lang.IllegalStateException: Field 'publish_time' is not set`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org