You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/05 11:38:26 UTC

[GitHub] [pulsar] hangc0276 opened a new pull request #9493: fix broker_timestamp not set bug

hangc0276 opened a new pull request #9493:
URL: https://github.com/apache/pulsar/pull/9493


   ### Motivation
   When configure broker.conf with 
   `brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor` 
   and run KOP with Pulsar Broker, we get the following exception:
   ```
   16:44:20.374 [pulsar-msg-expiry-monitor-27-1] WARN  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/__kafka/__consumer_offsets-partition-36] Error while getting the oldest mess
   age
   java.lang.IllegalStateException: Field 'broker_timestamp' is not set
           at org.apache.pulsar.common.api.proto.BrokerEntryMetadata.getBrokerTimestamp(BrokerEntryMetadata.java:14) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at org.apache.pulsar.client.impl.MessageImpl.isExpired(MessageImpl.java:299) ~[org.apache.pulsar-pulsar-client-original-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:2193) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:891) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$34(PersistentTopic.java:1245) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at java.util.ArrayList.forEach(ArrayList.java:1257) ~[?:1.8.0_172]
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1245) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_172]
           at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$51(BrokerService.java:1472) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:1470) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:1411) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
           at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.12.1.jar:4.12.1]
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_172]
           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_172]
           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_172]
           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_172]
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
           at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.51.Final.jar:4.1.51.Final]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
   ```
   
   The reason is that it doesn't check whether `brokerTimestamp` has been set in brokerEntryMetadata, which is set by `
   org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor`
   ```Java
   public boolean isExpired(int messageTTLInSeconds) {
           return messageTTLInSeconds != 0 && (brokerEntryMetadata == null
                   ? (System.currentTimeMillis() >
                       getPublishTime() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds))
                   : (System.currentTimeMillis() >
                       brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
       }
   ```
   
   ### Modification
   1. check whether brokerTimestamp has been set before call `getBrokerTimestamp` method.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #9493: fix broker_timestamp not set bug

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #9493:
URL: https://github.com/apache/pulsar/pull/9493#issuecomment-773995915


   Good job! But I have a question, can you reproduce this scenario in KoP's test? I found currently KoP tests enabled both two `BrokerEntryMetadataInterceptor`s while the `AppendBrokerTimestampMetadataInterceptor` is not necessary. However, after I removed the interceptor as followed
   
   ```java
       public static void addBrokerEntryMetadataInterceptors(ServiceConfiguration configuration) {
           Set<String> interceptorNames = new HashSet<>();
           //interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
           interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
           configuration.setBrokerEntryMetadataInterceptors(interceptorNames);
       }
   ```
   
   The KoP tests still passed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9493: fix broker_timestamp not set bug

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9493:
URL: https://github.com/apache/pulsar/pull/9493#issuecomment-774012504


   > ```java
   > addBrokerEntryMetadataInterceptors
   > ```
   @BewareMyPower The exception is triggered not by KOP, but by message expire check monitor, it will check every 5 minutes by default. When we not set `AppendBrokerTimestampMetadataInterceptor` interceptor, the expire monitor checker will read the entry metadata without `brokerTimestamp`.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 edited a comment on pull request #9493: fix broker_timestamp not set bug

Posted by GitBox <gi...@apache.org>.
hangc0276 edited a comment on pull request #9493:
URL: https://github.com/apache/pulsar/pull/9493#issuecomment-774092270


   > @hangc0276 Is it possible to add a test to avoid the regression?
   
   @codelipenghui OK, I will add the test.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9493: fix broker_timestamp not set bug

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9493:
URL: https://github.com/apache/pulsar/pull/9493#issuecomment-774092270


   > @hangc0276 Is it possible to add a test to avoid the regression?
   
   OK, I will add the test.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #9493: fix broker_timestamp not set bug

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #9493:
URL: https://github.com/apache/pulsar/pull/9493


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org