You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/26 03:42:39 UTC

[GitHub] [pulsar] fayce66 opened a new issue #9317: pulsar c++ topic compaction with message key does not seem to work

fayce66 opened a new issue #9317:
URL: https://github.com/apache/pulsar/issues/9317


   C++ consumer configured for topic compaction still receives all messages in the log instead of the last one for each topic key.
   
   1. The c++ producer:
   
   ```
       std::string some_string;
       std::string some_topic_key;
       
       Client client(lookup_url_);
       Producer producer;
       ProducerConfiguration configuration;
       configuration.setCompressionType(compression_type_); // CompressionNone or CompressionZSTD
       auto result = client.createProducer(topic_, configuration,producer);
       if (result != ResultOk) {
           // print error
           return -1;
       }
       auto msg = MessageBuilder()
               .setContent(some_string.data(), some_string.size())
               .setPartitionKey(some_topic_key)
               .build();
       auto res = producer.send(msg);
   ```
   
   2. The c++ consumer:
   
    ```
      Client client(lookup_url_);
       Consumer consumer;
       ConsumerConfiguration configuration;
       configuration.setConsumerType(consumer_type_); // ConsumerExclusive or ConsumerFailover
       if (consumer_type_ == pulsar::ConsumerShared) {
           // cannot have topic compaction with shared subscription ?
           configuration.setReadCompacted(false);
       } else {
           // set topic compaction
           configuration.setReadCompacted(true);
       }
       auto result = client.subscribe(topic_, subscription_name_, configuration, consumer);
       if (result != ResultOk) {
           // print error
           return -1;
       }
   
       Message msg;
       while (true) {
           consumer.receive(msg);
           consumer.acknowledge(msg);
       }
   
   ```
   3. set manual topic compaction with pulsar-admin:
   `$ bin/pulsar-admin topics compact "persistent://marianas/alphatrader/wing-calibration"
   `
   
   desktop: ubuntu 20.04 client (producer & consumer), pulsar-daemon running on centos 6.10
   
   I checked the messages received by the pulsar-client and the topic keys are properly set:
   
   ----- got message -----
   key:[fek.wing.ks102], properties:[], content:{"some string"}
   ----- got message -----
   key:[fek.wing.ks103], properties:[], content:{"some string"}
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768657251


   it still does not work for me, I tried to simplify my code to get closer to your example and I even hard-coded the partition key, but it still does not work. I run the producer for 1 or 2 mins to produce few messages then I launch the consumer expecting only the last message but I get:
   
   ```
   PM|9827|9839|09:47:07.429047|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429175|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429206|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429228|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429256|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429277|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429296|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429322|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   ```
   Also, if I stop the producer and let the consumer receive all the messages, then restart just the consumer to receive the last message with the partition key = 'test_key', I get no messages at all, like I would for a subscription with no topic compaction...
   
   Did you use a newer version of pulsar than 2.7.0? This is the version that I am using now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768657251


   it still does not work for me, I tried to simplify my code to get closer to your example and I even hard-coded the partition key, but it still does not work. I run the producer for 1 or 2 mins to produce few messages then I launch the consumer expecting only the last message but I get:
   
   ```
   PM|9827|9839|09:47:07.429047|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429175|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429206|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429228|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429256|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429277|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429296|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429322|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   ```
   Also, if I stop the producer and let the consumer receive all the messages, then restart just the consumer with the same subscription to receive the last message with the partition key = 'test_key', I get no messages at all, like I would for a subscription with no topic compaction...
   
   Did you use a newer version of pulsar than 2.7.0? This is the version that I am using now.
   
   I noticed that you are using:  `result = producer.send(msg, id);` which does not exist in pulsar c++ 2.7.0
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768861343


   OK, I rerun the pulsar-admin topics compact commands and effectively I can see some __compaction messages in the log now, and I restarted my client and it looks like it is working now. I did that 2 times yesterday and it did not work, that is strange. FYI, I stop and restart the pulsar-daemon every morning. Are the configurations saved or do I have to re-run the topics commands all the time I restart the broker??
   
   Here is my log, I think the compaction is working now:
   
   ```
   16:14:05.400 [pulsar-web-68-12] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Trigger compaction on topic persistent://marianas/alphatrader/wing-calibration
   16:14:05.421 [ForkJoinPool.commonPool-worker-29] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
     "topicNames" : [ "persistent://marianas/alphatrader/wing-calibration" ],
     "topicsPattern" : null,
     "subscriptionName" : "__compaction",
     "subscriptionType" : "Exclusive",
     "subscriptionMode" : "Durable",
     "receiverQueueSize" : 1000,
     "acknowledgementsGroupTimeMicros" : 100000,
     "negativeAckRedeliveryDelayMicros" : 60000000,
     "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
     "consumerName" : null,
     "ackTimeoutMillis" : 0,
     "tickDurationMillis" : 1000,
     "priorityLevel" : 0,
     "maxPendingChuckedMessage" : 10,
     "autoAckOldestChunkedMessageOnQueueFull" : false,
     "expireTimeOfIncompleteChunkedMessageMillis" : 60000,
     "cryptoFailureAction" : "FAIL",
     "properties" : { },
     "readCompacted" : true,
     "subscriptionInitialPosition" : "Latest",
     "patternAutoDiscoveryPeriod" : 60,
     "regexSubscriptionMode" : "PersistentOnly",
     "deadLetterPolicy" : null,
     "retryEnable" : false,
     "autoUpdatePartitions" : true,
     "autoUpdatePartitionsIntervalSeconds" : 60,
     "replicateSubscriptionState" : false,
     "resetIncludeHead" : false,
     "keySharedPolicy" : null,
     "batchIndexAckEnabled" : false
   }
   16:14:05.425 [ForkJoinPool.commonPool-worker-29] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
     "serviceUrl" : "pulsar://localhost:6650",
     "authPluginClassName" : "org.apache.pulsar.client.impl.auth.AuthenticationDisabled",
     "operationTimeoutMs" : 30000,
     "statsIntervalSeconds" : 60,
     "numIoThreads" : 1,
     "numListenerThreads" : 1,
     "connectionsPerBroker" : 1,
     "useTcpNoDelay" : true,
     "useTls" : false,
     "tlsTrustCertsFilePath" : null,
     "tlsAllowInsecureConnection" : false,
     "tlsHostnameVerificationEnable" : false,
     "concurrentLookupRequest" : 5000,
     "maxLookupRequest" : 50000,
     "maxLookupRedirects" : 20,
     "maxNumberOfRejectedRequestPerConnection" : 50,
     "keepAliveIntervalSeconds" : 30,
     "connectionTimeoutMs" : 10000,
     "requestTimeoutMs" : 60000,
     "initialBackoffIntervalNanos" : 100000000,
     "maxBackoffIntervalNanos" : 60000000000,
     "listenerName" : null,
     "useKeyStoreTls" : false,
     "sslProvider" : null,
     "tlsTrustStoreType" : "JKS",
     "tlsTrustStorePath" : null,
     "tlsTrustStorePassword" : null,
     "tlsCiphers" : [ ],
     "tlsProtocols" : [ ],
     "proxyServiceUrl" : null,
     "proxyProtocol" : null,
     "enableTransaction" : false
   }
   16:14:05.432 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650]] Connected to server
   16:14:05.432 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:42032
   16:14:05.436 [ForkJoinPool.commonPool-worker-29] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [28/Jan/2021:16:14:05 +0900] "PUT /admin/v2/persistent/marianas/alphatrader/wing-calibration/compaction HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.7.0" 38
   16:14:05.437 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribing to topic on cnx [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650], consumerId 0
   16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Subscribing on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:05.437 [pulsar-io-50-41] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration-__compaction] Rewind from 13911:132 to 13911:132
   16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration] There are no replicated subscriptions on the topic
   16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration][__compaction] Created new subscription for 0
   16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Created subscription on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:05.438 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
   16:14:05.442 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Seek subscription to message id -1:-1:-1
   16:14:05.443 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
   16:14:05.443 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
   16:14:05.444 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
   16:14:05.444 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 0
   16:14:05.444 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Closed connection [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
   16:14:05.446 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] Initiate reset position to 12028:-1 on cursor __compaction
   16:14:05.448 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [marianas/alphatrader/persistent/wing-calibration] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[109, 97, 114, 105, 97, 110, 97, 115, 47, 97, 108, 112, 104, 97, 116, 114, 97, 100, 101, 114, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 119, 105, 110, 103, 45, 99, 97, 108, 105, 98, 114, 97, 116, 105, 111, 110], pulsar/cursor=[95, 95, 99, 111, 109, 112, 97, 99, 116, 105, 111, 110], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
   16:14:05.465 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 19142
   16:14:05.475 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [marianas/alphatrader/persistent/wing-calibration] [__compaction] Updating cursor info ledgerId=19142 mark-delete=13911:131
   16:14:05.481 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] Updated cursor __compaction with ledger id 19142 md-position=13911:131 rd-position=13911:132
   16:14:05.484 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] reset position to 12028:-1 before current read position 13911:132 on cursor __compaction
   16:14:05.487 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] [persistent://marianas/alphatrader/wing-calibration][__compaction] Reset subscription to message id -1:-1
   16:14:05.487 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully reset subscription to message id -1:-1:-1
   16:14:05.546 [pulsar-timer-104-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Reconnecting after timeout
   16:14:05.548 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribing to topic on cnx [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650], consumerId 0
   16:14:05.548 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Subscribing on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:05.548 [pulsar-io-50-41] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration-__compaction] Rewind from 12028:-1 to 12028:0
   16:14:05.549 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration] There are no replicated subscriptions on the topic
   16:14:05.549 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration][__compaction] Created new subscription for 0
   16:14:05.549 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Created subscription on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:05.549 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
   16:14:05.590 [pulsar-external-listener-102-1] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Could not get connection while getLastMessageId -- Will try again in 100 ms
   16:14:05.592 [pulsar-external-listener-102-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Get topic last message Id
   16:14:05.614 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully getLastMessageId 18970:696
   16:14:05.614 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Get topic last message Id
   16:14:05.618 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully getLastMessageId 18970:696
   16:14:05.618 [pulsar-client-io-101-1] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase one of compaction for persistent://marianas/alphatrader/wing-calibration, reading to 18970:696:-1
   16:14:06.037 [io-write-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.statelib.impl.kv.RocksdbKVStore - Checkpoint local state store 000000000000000000/000000000000000000/000000000000000000 at revision -1
   16:14:06.037 [io-checkpoint-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.RocksdbCheckpointTask - Create a local checkpoint of state store 000000000000000000/000000000000000000/000000000000000000 at /home/faycal/Tools/apache-pulsar-2.7.0/data/standalone/bookkeeper/ranges/data/ranges/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85
   16:14:06.175 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 19143
   16:14:06.176 [main-EventThread] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase two of compaction for persistent://marianas/alphatrader/wing-calibration, from 13911:130:-1:-1 to 18970:696:-1:-1, compacting 2 keys to ledger 19143
   16:14:06.178 [main-EventThread] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Seek subscription to message id 13911:130:-1:-1
   16:14:06.179 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
   16:14:06.179 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
   16:14:06.179 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
   16:14:06.179 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] Initiate reset position to 13911:130 on cursor __compaction
   16:14:06.179 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 0
   16:14:06.179 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Closed connection [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
   16:14:06.181 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] reset position to 13911:130 before current read position 18970:697 on cursor __compaction
   16:14:06.181 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] [persistent://marianas/alphatrader/wing-calibration][__compaction] Reset subscription to message id 13911:130
   16:14:06.181 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully reset subscription to message id 13911:130:-1:-1
   16:14:06.222 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator for /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved version from -1 to 0.
   16:14:06.226 [main-EventThread] INFO  org.apache.distributedlog.BKLogWriteHandler - Initiating Recovery For 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default> : []
   16:14:06.227 [io-checkpoint-scheduler-OrderedScheduler-0-0] INFO  org.apache.distributedlog.BKLogWriteHandler - Initiating Recovery For 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default> : []
   16:14:06.231 [DLM-/stream/storage-OrderedScheduler-0-0] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase ALLOCATING : version = 0.
   16:14:06.245 [DLM-/stream/storage-OrderedScheduler-3-0-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 19144
   16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator for /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved version from 0 to 1.
   16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase ALLOCATED : version = 1.
   16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase HANDING_OVER : version = 1.
   16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.BKLogWriteHandler - No max ledger sequence number found while creating log segment 1 for 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>.
   16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase HANDED_OVER : version = 1.
   16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator for /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved version from 1 to 2.
   16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase ALLOCATING : version = 2.
   16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.logsegment.PerStreamLogSegmentCache - 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017 added log segment (inprogress_000000000000000001 : [LogSegmentId:19144, firstTxId:202, lastTxId:-999, version:VERSION_V5_SEQUENCE_ID, completionTime:0, recordCount:0, regionId:0, status:0, logSegmentSequenceNumber:1, lastEntryId:-1, lastSlotId:-1, inprogress:true, minActiveDLSN:DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}, startSequenceId:-1]) to cache.
   16:14:06.277 [main-EventThread] INFO  org.apache.distributedlog.BKLogWriteHandler - Deleting log segments older than 1611558846272 for 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default> : []
   16:14:06.278 [DLM-/stream/storage-OrderedScheduler-13-0] INFO  org.apache.distributedlog.BKLogSegmentWriter - Flushing before closing log segment 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>:inprogress_000000000000000001
   16:14:06.280 [pulsar-timer-104-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Reconnecting after timeout
   16:14:06.280 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  org.apache.distributedlog.BKLogSegmentWriter - Closing BKPerStreamLogWriter (abort=false) for 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>:inprogress_000000000000000001 : lastDLSN = DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} outstandingTransmits = 1 writesPendingTransmit = 0
   16:14:06.280 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  org.apache.distributedlog.BKLogSegmentWriter - Stream 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>:inprogress_000000000000000001 aborted 0 writes
   16:14:06.281 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribing to topic on cnx [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650], consumerId 0
   16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Subscribing on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:06.282 [pulsar-io-50-41] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration-__compaction] Rewind from 13911:130 to 13911:130
   16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration] There are no replicated subscriptions on the topic
   16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration][__compaction] Created new subscription for 0
   16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Created subscription on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:06.282 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768889040


   OK, so basically when I restart my consumers, the best thing to do is to trigger topic compaction manually before I restarted them so that I get only the last message/key correct? if at least it works like this, then it is fine, but the documentation is confusing. I thought the compaction was done automatically behind the scenes when new messages are produced.
   
   > When you run compaction on a topic, Pulsar goes through a topic's backlog and removes messages that are obscured by later messages, i.e. it goes through the topic on a per-key basis and leaves only the most recent message associated with that key.
   
   The doc should say "when you run manual compaction on a topic" that would be more clear.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-767280508


   @codelipenghui please assign to me


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768856896


   My description before might be a little confused.
   
   The logs I showed before was from broker side with the compaction command `bin/pulsar-admin topics compact xyz-topic`. The broker itself uses an internal `ConsumerImpl` to consume messages and then do compaction. I just want you to check if your topic compaction was performed. The steps is easy:
   
   1. Run a pulsar standalone in front end: `bin/pulsar standalone`, then you can see the output directly.
   2. Open a new terminal and run a producer to send some messages to `xyz-topic`.
   3. Run `bin/pulsar-admin topics compact xyz-topic`, see the output of standalone to check if the compaction was performed.
   
   As for 
   > You can run a Java consumer to see whether it works.
   
   I just mean if you're sure that the compaction was done in broker side, you can run a Java client application (`pulsar-client` seems not to provide the option to enable `readCompacted`) to check if Java client could only read 1 message while C++ client reads all messages.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-769537811


   OK, I think you can close it. It looks like issues can only be closed by author and committers.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 closed issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 closed issue #9317:
URL: https://github.com/apache/pulsar/issues/9317


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768118453


   I've tested in my local environment, it works well. Here're my detail steps:
   
   1. Clone the latest pulsar repository.
   2. Change `SampleConsumer.cc` and `SampleProducer.cc` under `pulsar-client-cpp/examples` to
   
   ```c++
   // SampleProducer.cc (ignore the license header...)
   #include <pulsar/Client.h>
   #include <lib/LogUtils.h>
   
   DECLARE_LOG_OBJECT()
   using namespace pulsar;
   
   int main() {
       Client client("pulsar://localhost:6650");
       Producer producer;
       Result result = client.createProducer("xyz-topic", producer);
       if (result != ResultOk) {
           LOG_ERROR("Error creating producer: " << result);
           return -1;
       }
   
       // Send synchronously
       const std::string key = "my-key";
       for (int i = 0; i < 10; i++) {
           const auto msg = MessageBuilder().setContent("msg-" + std::to_string(i)).setPartitionKey(key).build();
           MessageId id;
           result = producer.send(msg, id);
           if (result == ResultOk) {
               LOG_INFO("Send " << msg << " to " << id);
           } else {
               LOG_ERROR("Failed to send " << msg << ": " << result);
               break;
           }
       }
   
       client.close();
       return 0;
   }
   ```
   
   ```c++
   #include <pulsar/Client.h>
   #include <lib/LogUtils.h>
   
   DECLARE_LOG_OBJECT()
   using namespace pulsar;
   
   int main(int argc, char* argv[]) {
       const std::string subName = (argc > 1) ? argv[1] : "my-sub";
       Client client("pulsar://localhost:6650");
       Consumer consumer;
       ConsumerConfiguration conf;
       conf.setReadCompacted(true);
       conf.setSubscriptionInitialPosition(InitialPositionEarliest);
       Result result = client.subscribe("xyz-topic", subName, conf, consumer);
       if (result != ResultOk) {
           LOG_ERROR("Failed to subscribe: " << result);
           return -1;
       }
   
       Message msg;
       while (true) {
           Result result = consumer.receive(msg, 1000);
           if (result == ResultTimeout) {
               break;
           }
           if (result != ResultOk) {
               LOG_ERROR("Failed to receive: " << result);
               return 1;
           }
           LOG_INFO("Receive: " << msg.getPartitionKey() << " => " << msg.getDataAsString() << " from "
                                << msg.getMessageId());
           consumer.acknowledge(msg);
       }
   
       client.close();
       return 0;
   }
   ```
   
   As you can see, the `SampleProducer` send 10 messages (`msg-0` to `msg-9`) with the same key `my-key` to topic `xyz-topic`. And the `SampleConsumer` try to consume all messages from the initial position.
   
   3. Build the pulsar using `mvn clean install -DskipTests -Pcore-modules`.
   4. Run a test standalone service by running `pulsar-client-cpp/pulsar-test-service-start.sh`.
   5. Build the C++ client using `cmake`.
   6. Run `./examples/SampleProducer`.
   
   ```
   2021-01-27 16:21:28.153 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=0, publish_time=1611735688045, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,0,-1,0)
   2021-01-27 16:21:28.158 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=1, publish_time=1611735688153, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,1,-1,0)
   2021-01-27 16:21:28.164 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=2, publish_time=1611735688158, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,2,-1,0)
   2021-01-27 16:21:28.168 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=3, publish_time=1611735688164, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,3,-1,0)
   2021-01-27 16:21:28.175 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=4, publish_time=1611735688168, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,4,-1,0)
   2021-01-27 16:21:28.180 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=5, publish_time=1611735688175, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,5,-1,0)
   2021-01-27 16:21:28.185 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=6, publish_time=1611735688180, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,6,-1,0)
   2021-01-27 16:21:28.191 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=7, publish_time=1611735688185, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,7,-1,0)
   2021-01-27 16:21:28.195 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=8, publish_time=1611735688191, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,8,-1,0)
   2021-01-27 16:21:28.200 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=9, publish_time=1611735688195, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,9,-1,0)
   ```
   
   8. Run `./examples/SampleConsumer`
   
   ```
   2021-01-27 16:21:31.886 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-0 from (0,0,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-1 from (0,1,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-2 from (0,2,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-3 from (0,3,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-4 from (0,4,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-5 from (0,5,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-6 from (0,6,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-7 from (0,7,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-8 from (0,8,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-9 from (0,9,-1,0)
   ```
   
   9. Go to the project directory and run `bin/pulsar-admin topics compact xyz-topic` to compact this topic.
   10. Go back to C++ client build directory and run `./examples/SampleConsumer new-sub`
   
   ```
   2021-01-27 16:21:46.097 INFO  [0x700005d52000] ConsumerImpl:216 | [persistent://public/default/xyz-topic, new-sub, 0] Created consumer on broker [127.0.0.1:54843 -> 127.0.0.1:6650] 
   2021-01-27 16:21:46.114 INFO  [0x10ce3ddc0] SampleConsumer:49 | Receive: my-key => msg-9 from (0,9,-1,0)
   2021-01-27 16:21:47.116 INFO  [0x10ce3ddc0] ClientImpl:480 | Closing Pulsar client
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768118453


   I've tested in my local environment, it works well. Here're my detail steps:
   
   1. Clone the latest pulsar repository.
   2. Change `SampleConsumer.ccand `SampleProducer.cc` under `pulsar-client-cpp/examples` to
   
   ```c++
   // SampleProducer.cc (ignore the license header...)
   #include <pulsar/Client.h>
   #include <lib/LogUtils.h>
   
   DECLARE_LOG_OBJECT()
   using namespace pulsar;
   
   int main() {
       Client client("pulsar://localhost:6650");
       Producer producer;
       Result result = client.createProducer("xyz-topic", producer);
       if (result != ResultOk) {
           LOG_ERROR("Error creating producer: " << result);
           return -1;
       }
   
       // Send synchronously
       const std::string key = "my-key";
       for (int i = 0; i < 10; i++) {
           const auto msg = MessageBuilder().setContent("msg-" + std::to_string(i)).setPartitionKey(key).build();
           MessageId id;
           result = producer.send(msg, id);
           if (result == ResultOk) {
               LOG_INFO("Send " << msg << " to " << id);
           } else {
               LOG_ERROR("Failed to send " << msg << ": " << result);
               break;
           }
       }
   
       client.close();
       return 0;
   }
   ```
   
   ```c++
   #include <pulsar/Client.h>
   #include <lib/LogUtils.h>
   
   DECLARE_LOG_OBJECT()
   using namespace pulsar;
   
   int main(int argc, char* argv[]) {
       const std::string subName = (argc > 1) ? argv[1] : "my-sub";
       Client client("pulsar://localhost:6650");
       Consumer consumer;
       ConsumerConfiguration conf;
       conf.setReadCompacted(true);
       conf.setSubscriptionInitialPosition(InitialPositionEarliest);
       Result result = client.subscribe("xyz-topic", subName, conf, consumer);
       if (result != ResultOk) {
           LOG_ERROR("Failed to subscribe: " << result);
           return -1;
       }
   
       Message msg;
       while (true) {
           Result result = consumer.receive(msg, 1000);
           if (result == ResultTimeout) {
               break;
           }
           if (result != ResultOk) {
               LOG_ERROR("Failed to receive: " << result);
               return 1;
           }
           LOG_INFO("Receive: " << msg.getPartitionKey() << " => " << msg.getDataAsString() << " from "
                                << msg.getMessageId());
           consumer.acknowledge(msg);
       }
   
       client.close();
       return 0;
   }
   ```
   
   As you can see, the `SampleProducer` send 10 messages (`msg-0` to `msg-9`) with the same key `my-key` to topic `xyz-topic`. And the `SampleConsumer` try to consume all messages from the initial position.
   
   3. Build the pulsar using `mvn clean install -DskipTests -Pcore-modules`.
   4. Run a test standalone service by running `pulsar-client-cpp/pulsar-test-service-start.sh`.
   5. Build the C++ client using `cmake`.
   6. Run `./examples/SampleProducer`.
   
   ```
   2021-01-27 16:21:28.153 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=0, publish_time=1611735688045, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,0,-1,0)
   2021-01-27 16:21:28.158 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=1, publish_time=1611735688153, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,1,-1,0)
   2021-01-27 16:21:28.164 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=2, publish_time=1611735688158, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,2,-1,0)
   2021-01-27 16:21:28.168 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=3, publish_time=1611735688164, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,3,-1,0)
   2021-01-27 16:21:28.175 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=4, publish_time=1611735688168, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,4,-1,0)
   2021-01-27 16:21:28.180 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=5, publish_time=1611735688175, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,5,-1,0)
   2021-01-27 16:21:28.185 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=6, publish_time=1611735688180, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,6,-1,0)
   2021-01-27 16:21:28.191 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=7, publish_time=1611735688185, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,7,-1,0)
   2021-01-27 16:21:28.195 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=8, publish_time=1611735688191, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,8,-1,0)
   2021-01-27 16:21:28.200 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=9, publish_time=1611735688195, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,9,-1,0)
   ```
   
   8. Run `./examples/SampleConsumer`
   
   ```
   2021-01-27 16:21:31.886 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-0 from (0,0,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-1 from (0,1,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-2 from (0,2,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-3 from (0,3,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-4 from (0,4,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-5 from (0,5,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-6 from (0,6,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-7 from (0,7,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-8 from (0,8,-1,0)
   2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-9 from (0,9,-1,0)
   ```
   
   9. Go to the project directory and run `bin/pulsar-admin topics compact xyz-topic` to compact this topic.
   10. Go back to C++ client build directory and run `./examples/SampleConsumer new-sub`
   
   ```
   2021-01-27 16:21:46.097 INFO  [0x700005d52000] ConsumerImpl:216 | [persistent://public/default/xyz-topic, new-sub, 0] Created consumer on broker [127.0.0.1:54843 -> 127.0.0.1:6650] 
   2021-01-27 16:21:46.114 INFO  [0x10ce3ddc0] SampleConsumer:49 | Receive: my-key => msg-9 from (0,9,-1,0)
   2021-01-27 16:21:47.116 INFO  [0x10ce3ddc0] ClientImpl:480 | Closing Pulsar client
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768876195


   OK so basically you have the same results as me now. Maybe my mistake but I don't understand the topic compaction as functioning like this. My understanding is that it should always return the last message/key, even if more messages have been produced in the meantime, that is the whole point. The consumer is not interested in past messages, only the last values, like a last-value queue...it defeats the purpose of topic compaction if we have to receive all messages the first time, and then only the last one the second time...what do you think? Can you try to restart consumer sub2 a second time without producing more messages? you should receive only last message now...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768889040


   OK, so basically when I restart my consumers, the best thing to do is to trigger topic compaction manually before I restarted them so that I get only the last message/key correct? if at least it works like this, then it is fine, but the documentation is confusing. I thought the compaction was done automatically behind the scenes when new messages are produced.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768657251


   it still does not work for me, I tried to simplify my code to get closer to your example and I even hard-coded the partition key, but it still does not work. I run the producer for 1 or 2 mins to produce few messages then I launch the consumer expecting only the last message but I get:
   
   ```
   PM|9827|9839|09:47:07.429047|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429175|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429206|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429228|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429256|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429277|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429296|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429322|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   ```
   Also, if I stop the producer and let the consumer receive all the messages, then restart just the consumer with the same subscription to receive the last message with the partition key = 'test_key', I get no messages at all, like I would for a subscription with no topic compaction...(btw, you are using  conf.setSubscriptionInitialPosition(InitialPositionEarliest); does that mean that for topic compaction subscription, it's absolutely necessary to set this configuration to receive the last message of my partition key?)
   
   Did you use a newer version of pulsar than 2.7.0? This is the version that I am using now.
   
   I noticed that you are using:  `result = producer.send(msg, id);` which does not exist in pulsar c++ 2.7.0
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768876195


   OK so basically you have the same results as me now. Maybe my mistake but I don't understand the topic compaction as functioning like this. My understanding is that it should always return the last message/key period, even if more messages have been produced in the meantime, that is the whole point. The consumer is not interested in past messages, only the last values, like a last-value queue...it defeats the purpose of topic compaction if we have to receive all messages the first time, and then only the last one the second time...what do you think? Can you try to restart consumer sub2 a second time without producing more messages? you should receive only last message now...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768876195


   OK so basically you have the same results as me now. Maybe my mistake but I don't understand the topic compaction as functioning like this. My understanding is that it should always return the last message/key, even if more messages have been produced in the meantime, that is the whole point. The consumer is not interested in past messages, only the last values, like a last-value queue...it defeats the purpose of topic compaction if we have to receive all messages the first time, and then only the last one the second time...what do you think? Can you try to restart consumer sub2 a second time without producing more messages? you should receive only last message now...
   
   The way it is working now it's like the compaction stops for a given subscription at the last message produced while the subscription is alive. When new messages are produced, it's like to topic compaction feature stays at the same messageId for that subscription, so that the subscription gets all the delta when restarted, and then the topic compaction is updated again...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768866446


   OK, what I have noticed is the following: if my producer produces more messages while my consumer is not started, the first time I start the consumer, it will receive all missed messages for the subscription. If I stop/restart the consumer with the same subscription, it will receive only the last message/key. Is that a normal behavior? In your test, your producer does not produce more messages before the second subscription new-sub is started...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768657251


   it still does not work for me, I tried to simplify my code to get closer to your example and I even hard-coded the partition key, but it still does not work. I run the producer for 1 or 2 mins to produce few messages then I launch the consumer expecting only the last message but I get:
   
   ```
   PM|9827|9839|09:47:07.429047|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429175|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429206|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429228|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429256|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429277|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429296|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   PM|9827|9839|09:47:07.429322|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
   ```
   Also, if I stop the producer and let the consumer receive all the messages, then restart just the consumer with the same subscription to receive the last message with the partition key = 'test_key', I get no messages at all, like I would for a subscription with no topic compaction...
   
   Did you use a newer version of pulsar than 2.7.0? This is the version that I am using now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768914361


   > OK, so basically when I restart my consumers, the best thing to do is to trigger topic compaction manually before I restarted them so that I get only the last message/key correct?
   
   Yes.
   
   > The doc should say "when you run manual compaction on a topic" that would be more clear.
   
   I think `manual` is redundant, because the automatic compaction does the same as the manual compaction. IMO, it should be emphasized that compaction here should be treated as an action that could be triggered. However, topic compaction is also a feature that is applied to a topic. So `run compaction on a topic` might be misunderstood as **applying this feature on a topic**.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768876195


   What do you mean by 'compact the topic' after producing 10 messages? you mean you run the pulsar-admin compact command again?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768876195


   What do you mean by 'compact the topic' after producing 10 messages? you mean you run the pulsar-admin compact command again? if you started sub2 after the last 10 messages were produced, what is sub2 receiving everything is there's topic compaction?
   
   Maybe my mistake but I don't understand the topic compaction as functioning like this. My understanding is that it should always return the last message/key, even if more messages have been produced in the meantime, that is the whole point. The consumer is not interested in past messages, only the last values, like a last-value queue...it defeats the purpose of topic compaction if we have to receive all messages the first time, and then only the last one the second time...what do you think? Can you try to restart consumer sub2 a second time without producing more messages? you should receive only last message now...
   
   The way it is working now it's like the compaction stops for a given subscription at the last message produced while the subscription is alive. When new messages are produced, it's like to topic compaction feature stays at the same messageId for that subscription, so that the subscription gets all the delta when restarted, and then the topic compaction is updated again...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768876195


   What do you mean by 'compact the topic' after producing 10 messages? you mean you run the pulsar-admin compact command again?
   
   Maybe my mistake but I don't understand the topic compaction as functioning like this. My understanding is that it should always return the last message/key, even if more messages have been produced in the meantime, that is the whole point. The consumer is not interested in past messages, only the last values, like a last-value queue...it defeats the purpose of topic compaction if we have to receive all messages the first time, and then only the last one the second time...what do you think? Can you try to restart consumer sub2 a second time without producing more messages? you should receive only last message now...
   
   The way it is working now it's like the compaction stops for a given subscription at the last message produced while the subscription is alive. When new messages are produced, it's like to topic compaction feature stays at the same messageId for that subscription, so that the subscription gets all the delta when restarted, and then the topic compaction is updated again...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768856896


   My description before might be a little confused.
   
   The logs I showed before was from broker side with the compaction command `bin/pulsar-admin topics compact xyz-topic`. The broker itself uses an internal `ConsumerImpl` to consume messages and then do compaction. I just want you to check if your topic compaction was performed. The steps:
   
   1. Run a pulsar standalone in front end: `bin/pulsar standalone`, then you can see the output directly.
   2. Open a new terminal and run a producer to send some messages to `xyz-topic`.
   3. Run `bin/pulsar-admin topics compact xyz-topic`, see the output of standalone to check if the compaction was performed.
   
   As for 
   > You can run a Java consumer to see whether it works.
   
   I just mean if you're sure that the compaction was done in broker side, you can run a Java client application (`pulsar-client` seems not to provide the option to enable `readCompacted`) to check if Java client could only read 1 message while C++ client reads all messages.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768861343


   OK, I rerun the pulsar-admin topics compact commands and effectively I can see some __compaction messages in the log now, and I restarted my client and it looks like it is working now. I did that 2 times yesterday and it did not work, that is strange. FYI, I stop and restart the pulsar-daemon every morning. Are the configurations saved or do I have to re-run the topics commands all the time I restart the broker??
   
   Here is my log, I think the compaction is working now, but I'm gonna keep on testing some more to make sure it's OK now:
   
   ```
   16:14:05.400 [pulsar-web-68-12] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Trigger compaction on topic persistent://marianas/alphatrader/wing-calibration
   16:14:05.421 [ForkJoinPool.commonPool-worker-29] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
     "topicNames" : [ "persistent://marianas/alphatrader/wing-calibration" ],
     "topicsPattern" : null,
     "subscriptionName" : "__compaction",
     "subscriptionType" : "Exclusive",
     "subscriptionMode" : "Durable",
     "receiverQueueSize" : 1000,
     "acknowledgementsGroupTimeMicros" : 100000,
     "negativeAckRedeliveryDelayMicros" : 60000000,
     "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
     "consumerName" : null,
     "ackTimeoutMillis" : 0,
     "tickDurationMillis" : 1000,
     "priorityLevel" : 0,
     "maxPendingChuckedMessage" : 10,
     "autoAckOldestChunkedMessageOnQueueFull" : false,
     "expireTimeOfIncompleteChunkedMessageMillis" : 60000,
     "cryptoFailureAction" : "FAIL",
     "properties" : { },
     "readCompacted" : true,
     "subscriptionInitialPosition" : "Latest",
     "patternAutoDiscoveryPeriod" : 60,
     "regexSubscriptionMode" : "PersistentOnly",
     "deadLetterPolicy" : null,
     "retryEnable" : false,
     "autoUpdatePartitions" : true,
     "autoUpdatePartitionsIntervalSeconds" : 60,
     "replicateSubscriptionState" : false,
     "resetIncludeHead" : false,
     "keySharedPolicy" : null,
     "batchIndexAckEnabled" : false
   }
   16:14:05.425 [ForkJoinPool.commonPool-worker-29] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
     "serviceUrl" : "pulsar://localhost:6650",
     "authPluginClassName" : "org.apache.pulsar.client.impl.auth.AuthenticationDisabled",
     "operationTimeoutMs" : 30000,
     "statsIntervalSeconds" : 60,
     "numIoThreads" : 1,
     "numListenerThreads" : 1,
     "connectionsPerBroker" : 1,
     "useTcpNoDelay" : true,
     "useTls" : false,
     "tlsTrustCertsFilePath" : null,
     "tlsAllowInsecureConnection" : false,
     "tlsHostnameVerificationEnable" : false,
     "concurrentLookupRequest" : 5000,
     "maxLookupRequest" : 50000,
     "maxLookupRedirects" : 20,
     "maxNumberOfRejectedRequestPerConnection" : 50,
     "keepAliveIntervalSeconds" : 30,
     "connectionTimeoutMs" : 10000,
     "requestTimeoutMs" : 60000,
     "initialBackoffIntervalNanos" : 100000000,
     "maxBackoffIntervalNanos" : 60000000000,
     "listenerName" : null,
     "useKeyStoreTls" : false,
     "sslProvider" : null,
     "tlsTrustStoreType" : "JKS",
     "tlsTrustStorePath" : null,
     "tlsTrustStorePassword" : null,
     "tlsCiphers" : [ ],
     "tlsProtocols" : [ ],
     "proxyServiceUrl" : null,
     "proxyProtocol" : null,
     "enableTransaction" : false
   }
   16:14:05.432 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650]] Connected to server
   16:14:05.432 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:42032
   16:14:05.436 [ForkJoinPool.commonPool-worker-29] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [28/Jan/2021:16:14:05 +0900] "PUT /admin/v2/persistent/marianas/alphatrader/wing-calibration/compaction HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.7.0" 38
   16:14:05.437 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribing to topic on cnx [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650], consumerId 0
   16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Subscribing on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:05.437 [pulsar-io-50-41] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration-__compaction] Rewind from 13911:132 to 13911:132
   16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration] There are no replicated subscriptions on the topic
   16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration][__compaction] Created new subscription for 0
   16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Created subscription on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:05.438 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
   16:14:05.442 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Seek subscription to message id -1:-1:-1
   16:14:05.443 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
   16:14:05.443 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
   16:14:05.444 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
   16:14:05.444 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 0
   16:14:05.444 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Closed connection [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
   16:14:05.446 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] Initiate reset position to 12028:-1 on cursor __compaction
   16:14:05.448 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [marianas/alphatrader/persistent/wing-calibration] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[109, 97, 114, 105, 97, 110, 97, 115, 47, 97, 108, 112, 104, 97, 116, 114, 97, 100, 101, 114, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 119, 105, 110, 103, 45, 99, 97, 108, 105, 98, 114, 97, 116, 105, 111, 110], pulsar/cursor=[95, 95, 99, 111, 109, 112, 97, 99, 116, 105, 111, 110], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
   16:14:05.465 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 19142
   16:14:05.475 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [marianas/alphatrader/persistent/wing-calibration] [__compaction] Updating cursor info ledgerId=19142 mark-delete=13911:131
   16:14:05.481 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] Updated cursor __compaction with ledger id 19142 md-position=13911:131 rd-position=13911:132
   16:14:05.484 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] reset position to 12028:-1 before current read position 13911:132 on cursor __compaction
   16:14:05.487 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] [persistent://marianas/alphatrader/wing-calibration][__compaction] Reset subscription to message id -1:-1
   16:14:05.487 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully reset subscription to message id -1:-1:-1
   16:14:05.546 [pulsar-timer-104-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Reconnecting after timeout
   16:14:05.548 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribing to topic on cnx [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650], consumerId 0
   16:14:05.548 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Subscribing on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:05.548 [pulsar-io-50-41] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration-__compaction] Rewind from 12028:-1 to 12028:0
   16:14:05.549 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration] There are no replicated subscriptions on the topic
   16:14:05.549 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration][__compaction] Created new subscription for 0
   16:14:05.549 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Created subscription on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:05.549 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
   16:14:05.590 [pulsar-external-listener-102-1] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Could not get connection while getLastMessageId -- Will try again in 100 ms
   16:14:05.592 [pulsar-external-listener-102-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Get topic last message Id
   16:14:05.614 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully getLastMessageId 18970:696
   16:14:05.614 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Get topic last message Id
   16:14:05.618 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully getLastMessageId 18970:696
   16:14:05.618 [pulsar-client-io-101-1] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase one of compaction for persistent://marianas/alphatrader/wing-calibration, reading to 18970:696:-1
   16:14:06.037 [io-write-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.statelib.impl.kv.RocksdbKVStore - Checkpoint local state store 000000000000000000/000000000000000000/000000000000000000 at revision -1
   16:14:06.037 [io-checkpoint-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.RocksdbCheckpointTask - Create a local checkpoint of state store 000000000000000000/000000000000000000/000000000000000000 at /home/faycal/Tools/apache-pulsar-2.7.0/data/standalone/bookkeeper/ranges/data/ranges/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85
   16:14:06.175 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 19143
   16:14:06.176 [main-EventThread] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase two of compaction for persistent://marianas/alphatrader/wing-calibration, from 13911:130:-1:-1 to 18970:696:-1:-1, compacting 2 keys to ledger 19143
   16:14:06.178 [main-EventThread] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Seek subscription to message id 13911:130:-1:-1
   16:14:06.179 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
   16:14:06.179 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
   16:14:06.179 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
   16:14:06.179 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] Initiate reset position to 13911:130 on cursor __compaction
   16:14:06.179 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 0
   16:14:06.179 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Closed connection [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
   16:14:06.181 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] reset position to 13911:130 before current read position 18970:697 on cursor __compaction
   16:14:06.181 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] [persistent://marianas/alphatrader/wing-calibration][__compaction] Reset subscription to message id 13911:130
   16:14:06.181 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully reset subscription to message id 13911:130:-1:-1
   16:14:06.222 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator for /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved version from -1 to 0.
   16:14:06.226 [main-EventThread] INFO  org.apache.distributedlog.BKLogWriteHandler - Initiating Recovery For 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default> : []
   16:14:06.227 [io-checkpoint-scheduler-OrderedScheduler-0-0] INFO  org.apache.distributedlog.BKLogWriteHandler - Initiating Recovery For 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default> : []
   16:14:06.231 [DLM-/stream/storage-OrderedScheduler-0-0] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase ALLOCATING : version = 0.
   16:14:06.245 [DLM-/stream/storage-OrderedScheduler-3-0-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 19144
   16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator for /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved version from 0 to 1.
   16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase ALLOCATED : version = 1.
   16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase HANDING_OVER : version = 1.
   16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.BKLogWriteHandler - No max ledger sequence number found while creating log segment 1 for 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>.
   16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase HANDED_OVER : version = 1.
   16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator for /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved version from 1 to 2.
   16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase ALLOCATING : version = 2.
   16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.logsegment.PerStreamLogSegmentCache - 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017 added log segment (inprogress_000000000000000001 : [LogSegmentId:19144, firstTxId:202, lastTxId:-999, version:VERSION_V5_SEQUENCE_ID, completionTime:0, recordCount:0, regionId:0, status:0, logSegmentSequenceNumber:1, lastEntryId:-1, lastSlotId:-1, inprogress:true, minActiveDLSN:DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}, startSequenceId:-1]) to cache.
   16:14:06.277 [main-EventThread] INFO  org.apache.distributedlog.BKLogWriteHandler - Deleting log segments older than 1611558846272 for 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default> : []
   16:14:06.278 [DLM-/stream/storage-OrderedScheduler-13-0] INFO  org.apache.distributedlog.BKLogSegmentWriter - Flushing before closing log segment 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>:inprogress_000000000000000001
   16:14:06.280 [pulsar-timer-104-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Reconnecting after timeout
   16:14:06.280 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  org.apache.distributedlog.BKLogSegmentWriter - Closing BKPerStreamLogWriter (abort=false) for 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>:inprogress_000000000000000001 : lastDLSN = DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} outstandingTransmits = 1 writesPendingTransmit = 0
   16:14:06.280 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  org.apache.distributedlog.BKLogSegmentWriter - Stream 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>:inprogress_000000000000000001 aborted 0 writes
   16:14:06.281 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribing to topic on cnx [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650], consumerId 0
   16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Subscribing on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:06.282 [pulsar-io-50-41] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration-__compaction] Rewind from 13911:130 to 13911:130
   16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration] There are no replicated subscriptions on the topic
   16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration][__compaction] Created new subscription for 0
   16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Created subscription on topic persistent://marianas/alphatrader/wing-calibration / __compaction
   16:14:06.282 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768870753


   I just did another test:
   
   1. Produce 10 messages.
   2. Compact the topic.
   3. Run consumer with `sub1`, only 1 message was received:
   
   ```
   2021-01-28 15:53:32.400 INFO  [0x1031aedc0] SampleConsumer:49 | Receive: my-key => msg-9 from (10,9,-1,0)
   ```
    
   4. Produce another 10 messages.
   5. Run consumer with `sub2`, 11 messages were received:
   
   ```
   2021-01-28 15:54:06.022 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-9 from (10,9,-1,0)
   2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-0 from (10,10,-1,0)
   2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-1 from (10,11,-1,0)
   2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-2 from (10,12,-1,0)
   2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-3 from (10,13,-1,0)
   2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-4 from (10,14,-1,0)
   2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-5 from (10,15,-1,0)
   2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-6 from (10,16,-1,0)
   2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-7 from (10,17,-1,0)
   2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-8 from (10,18,-1,0)
   2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-9 from (10,19,-1,0)
   ```
   
   I think the behavior is normal. Or did I just misunderstand?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768876195






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768748293


   @fayce66 Sorry it's a new API. For the old client, you can use
   
   ```c++
           result = producer.send(msg);
           if (result == ResultOk) {
               LOG_INFO("Send " << msg << " to " << msg.getMessageId());
   ```
   
   In addition, I suspect it's not a client side problem but just your compaction failed. You can run a Java consumer to see whether it works. Could you check your broker logs when you did the topic compaction? Here's my logs:
   
   ```
   10:12:07.084 [pulsar-web-67-12] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Trigger compaction on topic persistent://public/default/xyz-topic
   10:12:07.095 [ForkJoinPool.commonPool-worker-4] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config:
   // ...
   10:12:07.103 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650]] Connected to server
   10:12:07.099 [ForkJoinPool.commonPool-worker-4] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
   // ...
   10:12:07.103 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650]] Connected to server
   10:12:07.106 [ForkJoinPool.commonPool-worker-4] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [28/一月/2021:10:12:07 +0800] "PUT /admin/v2/persistent/public/default/xyz-topic/compaction HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.8.0-SNAPSHOT" 24
   10:12:07.106 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:50650
   10:12:07.108 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribing to topic on cnx [id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650], consumerId 0
   10:12:07.108 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Subscribing on topic persistent://public/default/xyz-topic / __compaction
   10:12:07.108 [pulsar-io-50-12] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] Cursor __compaction recovered to position 22:9
   10:12:07.109 [pulsar-io-50-12] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/xyz-topic] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[112, 117, 98, 108, 105, 99, 47, 100, 101, 102, 97, 117, 108, 116, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 120, 121, 122, 45, 116, 111, 112, 105, 99], pulsar/cursor=[95, 95, 99, 111, 109, 112, 97, 99, 116, 105, 111, 110], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
   10:12:07.112 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 23
   10:12:07.118 [BookKeeperClientWorker-OrderedExecutor-11-0] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [public/default/persistent/xyz-topic] [__compaction] Updating cursor info ledgerId=23 mark-delete=22:9
   10:12:07.119 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] Updated cursor __compaction with ledger id 23 md-position=22:9 rd-position=22:10
   10:12:07.119 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/xyz-topic] Opened new cursor: ManagedCursorImpl{ledger=public/default/persistent/xyz-topic, name=__compaction, ackPos=22:9, readPos=22:10}
   10:12:07.120 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic-__compaction] Rewind from 22:10 to 22:10
   10:12:07.120 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic] There are no replicated subscriptions on the topic
   10:12:07.120 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic][__compaction] Created new subscription for 0
   10:12:07.120 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Created subscription on topic persistent://public/default/xyz-topic / __compaction
   10:12:07.121 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
   10:12:07.123 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Seek subscription to message id -1:-1:-1
   10:12:07.125 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://public/default/xyz-topic, name=__compaction}, consumerId=0, consumerName=3ee4e, address=/127.0.0.1:50650}
   10:12:07.125 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/xyz-topic, name=__compaction}, consumerId=0, consumerName=3ee4e, address=/127.0.0.1:50650}
   10:12:07.126 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/xyz-topic][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
   10:12:07.126 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] Initiate reset position to 22:-1 on cursor __compaction
   10:12:07.127 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 0
   10:12:07.127 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/xyz-topic] [__compaction] Closed connection [id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
   10:12:07.130 [BookKeeperClientWorker-OrderedExecutor-11-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] reset position to 22:-1 before current read position 22:10 on cursor __compaction
   10:12:07.132 [BookKeeperClientWorker-OrderedExecutor-11-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] [persistent://public/default/xyz-topic][__compaction] Reset subscription to message id -1:-1
   10:12:07.133 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Successfully reset subscription to message id -1:-1:-1
   10:12:07.228 [pulsar-timer-99-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/xyz-topic] [__compaction] Reconnecting after timeout
   10:12:07.230 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribing to topic on cnx [id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650], consumerId 0
   10:12:07.230 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Subscribing on topic persistent://public/default/xyz-topic / __compaction
   10:12:07.231 [pulsar-io-50-12] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic-__compaction] Rewind from 22:-1 to 22:0
   10:12:07.231 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic] There are no replicated subscriptions on the topic
   10:12:07.231 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic][__compaction] Created new subscription for 0
   10:12:07.231 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Created subscription on topic persistent://public/default/xyz-topic / __compaction
   10:12:07.231 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
   10:12:07.239 [pulsar-external-listener-97-1] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic] [__compaction] Could not get connection while getLastMessageId -- Will try again in 100 ms
   10:12:07.240 [pulsar-external-listener-97-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Get topic last message Id
   10:12:07.243 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Successfully getLastMessageId 22:9
   10:12:07.243 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Get topic last message Id
   10:12:07.244 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Successfully getLastMessageId 22:9
   10:12:07.245 [pulsar-client-io-96-1] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase one of compaction for persistent://public/default/xyz-topic, reading to 22:9:-1:0
   10:12:07.256 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 24
   10:12:07.256 [main-EventThread] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase two of compaction for persistent://public/default/xyz-topic, from 22:0:-1:-1 to 22:9:-1:-1, compacting 1 keys to ledger 24
   10:12:07.258 [main-EventThread] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Seek subscription to message id 22:0:-1:-1
   10:12:07.259 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://public/default/xyz-topic, name=__compaction}, consumerId=0, consumerName=3ee4e, address=/127.0.0.1:50650}
   10:12:07.259 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/xyz-topic, name=__compaction}, consumerId=0, consumerName=3ee4e, address=/127.0.0.1:50650}
   10:12:07.259 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/xyz-topic][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
   10:12:07.259 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 0
   10:12:07.259 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/xyz-topic] [__compaction] Closed connection [id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
   10:12:07.259 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] Initiate reset position to 22:0 on cursor __compaction
   10:12:07.261 [BookKeeperClientWorker-OrderedExecutor-11-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] reset position to 22:0 before current read position 22:10 on cursor __compaction
   10:12:07.261 [BookKeeperClientWorker-OrderedExecutor-11-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] [persistent://public/default/xyz-topic][__compaction] Reset subscription to message id 22:0
   10:12:07.261 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Successfully reset subscription to message id 22:0:-1:-1
   10:12:07.360 [pulsar-timer-99-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/xyz-topic] [__compaction] Reconnecting after timeout
   10:12:07.361 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribing to topic on cnx [id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650], consumerId 0
   10:12:07.361 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Subscribing on topic persistent://public/default/xyz-topic / __compaction
   10:12:07.362 [pulsar-io-50-12] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic-__compaction] Rewind from 22:0 to 22:0
   10:12:07.362 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic] There are no replicated subscriptions on the topic
   10:12:07.362 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic][__compaction] Created new subscription for 0
   10:12:07.362 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Created subscription on topic persistent://public/default/xyz-topic / __compaction
   10:12:07.362 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
   10:12:07.382 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Closing consumer: consumerId=0
   10:12:07.382 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/xyz-topic, name=__compaction}, consumerId=0, consumerName=3ee4e, address=/127.0.0.1:50650}
   10:12:07.382 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Closed consumer, consumerId=0
   10:12:07.383 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic] [__compaction] Closed consumer
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768876195


   OK so basically you have the same results as me now. Maybe my mistake but I don't understand the topic compaction is functioning like this. My understanding is that it should always return the last message/key period, even if more messages have been produced in the meantime, that is the whole point. The consumer is not interested in past messages, only the last values, like a last-value queue...it defeats the purpose of topic compaction if we have to receive all messages the first time, and then only the last one the second time...what do you think? Can you try to restart consumer sub2 a second time without producing more messages? you should receive only last message now...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768882909


   > What do you mean by 'compact the topic' after producing 10 messages? you mean you run the pulsar-admin compact command again?
   
   Yeah, the command is just a manual compaction for old messages. A manual compaction is just compacting the existed messages but not compact all old messages when a new message arrive, which is inefficient. Topic compaction's purpose is mainly for reducing storage.
   
   If you have a consumer that subscribes a topic, even if no compaction was done, the consumer will always read the latest message because each time a message is acknowledged, the consume position will be persisted as a **cursor**. However, for some scenarios like what you said, once a newer message has been read, the old messages are no longer meaningful. We needn't store these messages any more so we can compact the topic. Then these messages are useless for new subscription.
   
   See http://pulsar.apache.org/docs/en/concepts-topic-compaction/, There're two ways to do topic compaction:
   
   > Triggered automatically when the backlog reaches a certain size or can be triggered manually via the command line. See the Topic compaction cookbook
   
   The command, is just triggering compaction manually. I think what you want is doing the compaction for each message arrived. It's unnecessary except you never acknowledge any message and restart consuming or use a new subscription frequently.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768882909


   > What do you mean by 'compact the topic' after producing 10 messages? you mean you run the pulsar-admin compact command again?
   
   Yeah, the command is just a manual compaction for old messages. A manual compaction is just compacting the existed messages but not compact all old messages when a new message arrive, which is inefficient. Topic compaction's purpose is mainly for reducing storage.
   
   If you have a consumer that subscribes a topic, even if no compaction was done, the consumer will always read the latest message because each time a message is acknowledged, the consume position will be persisted as a **cursor**. However, for some scenarios like what you said, once a newer message has been read, the old messages are no longer meaningful. We needn't store these messages any more so we can compact the topic. Then these messages are useless for new subscription.
   
   See http://pulsar.apache.org/docs/en/concepts-topic-compaction/, There're two ways to do topic compaction:
   
   > Triggered automatically when the backlog reaches a certain size or can be triggered manually via the command line. See the Topic compaction cookbook
   
   The command, is just triggering compaction manually. I think what you want is doing the compaction for each message arrived. It's unnecessary except you never acknowledge any message and restart consuming or use a new subscription frequently.
   
   A suggestion is to use namespace policy or topic level policy for topic compaction.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-769535894


   Can we close this issue since you explained that pulsar behavior is normal regarding topic compaction?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fayce66 commented on issue #9317: pulsar c++ topic compaction with message key does not seem to work

Posted by GitBox <gi...@apache.org>.
fayce66 commented on issue #9317:
URL: https://github.com/apache/pulsar/issues/9317#issuecomment-768846704


   I don't see any option to start the bin/pulsar-client with topic compaction...am i wrong? I tried to test it this way but the help message does not say how to set the read compacted to true:
   
   ```
   bin/pulsar-client --url "pulsar://marianas1:6650" consume "persistent://marianas/alphatrader/wing-calibration" -s "fek.sts" -n 0
   
   17:01:45.245 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xdf0a36c1, L:/10.0.3.6:48808 - R:marianas1/10.108.12.141:6650]] Connected to server
   17:01:45.809 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
     "topicNames" : [ "persistent://marianas/alphatrader/wing-calibration" ],
     "topicsPattern" : null,
     "subscriptionName" : "fek.sts",
     "subscriptionType" : "Exclusive",
     "subscriptionMode" : "Durable",
     "receiverQueueSize" : 1000,
     "acknowledgementsGroupTimeMicros" : 100000,
     "negativeAckRedeliveryDelayMicros" : 60000000,
     "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
     "consumerName" : null,
     "ackTimeoutMillis" : 0,
     "tickDurationMillis" : 1000,
     "priorityLevel" : 0,
     "maxPendingChuckedMessage" : 10,
     "autoAckOldestChunkedMessageOnQueueFull" : false,
     "expireTimeOfIncompleteChunkedMessageMillis" : 60000,
     "cryptoFailureAction" : "FAIL",
     "properties" : { },
     "readCompacted" : false,
     "subscriptionInitialPosition" : "Latest",
     "patternAutoDiscoveryPeriod" : 60,
     "regexSubscriptionMode" : "PersistentOnly",
     "deadLetterPolicy" : null,
     "retryEnable" : false,
     "autoUpdatePartitions" : true,
     "autoUpdatePartitionsIntervalSeconds" : 60,
     "replicateSubscriptionState" : false,
     "resetIncludeHead" : false,
     "keySharedPolicy" : null,
     "batchIndexAckEnabled" : false
   }
   17:01:45.814 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
     "serviceUrl" : "pulsar://marianas1:6650",
     "authPluginClassName" : null,
     "operationTimeoutMs" : 30000,
     "statsIntervalSeconds" : 60,
     "numIoThreads" : 1,
     "numListenerThreads" : 1,
     "connectionsPerBroker" : 1,
     "useTcpNoDelay" : true,
     "useTls" : false,
     "tlsTrustCertsFilePath" : "",
     "tlsAllowInsecureConnection" : false,
     "tlsHostnameVerificationEnable" : false,
     "concurrentLookupRequest" : 5000,
     "maxLookupRequest" : 50000,
     "maxLookupRedirects" : 20,
     "maxNumberOfRejectedRequestPerConnection" : 50,
     "keepAliveIntervalSeconds" : 30,
     "connectionTimeoutMs" : 10000,
     "requestTimeoutMs" : 60000,
     "initialBackoffIntervalNanos" : 100000000,
     "maxBackoffIntervalNanos" : 60000000000,
     "listenerName" : null,
     "useKeyStoreTls" : false,
     "sslProvider" : null,
     "tlsTrustStoreType" : "JKS",
     "tlsTrustStorePath" : "",
     "tlsTrustStorePassword" : "",
     "tlsCiphers" : [ ],
     "tlsProtocols" : [ ],
     "proxyServiceUrl" : null,
     "proxyProtocol" : null,
     "enableTransaction" : false
   }
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org