You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/08 09:05:05 UTC

[GitHub] [pulsar] sxpsxp12 opened a new issue #7479: support pulsar consumer keyshared in C++ And Cgo

sxpsxp12 opened a new issue #7479:
URL: https://github.com/apache/pulsar/issues/7479


   **Describe the bug**
   
   in pulsar 2.6.0,pulsar C++ client(newest) use keyshared policy,but consumer.receive() return timeout;when use other consumer type, it return pulsar data normally。
   Above issues apply to cgo
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Go to '...'
   2. Click on '....'
   3. Scroll down to '....'
   4. See error
   
   **Expected behavior**
   
   support keyshared consumer type in C++ And cgo
   
   **Screenshots**
   If applicable, add screenshots to help explain your problem.
   
   **Desktop (please complete the following information):**
    - OS: [e.g. iOS]
   
   **Additional context**
   pulsar broker: 2.6.0
   C++ pulsar client :2.6.0
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sxpsxp12 commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
sxpsxp12 commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-656067759


   Today I tried to locate the problem and found that when the producer enables compression(i use pulsar.LZ4)   to send data to pulsar broker(2.6.2 version)。 the consumer use keyshared policy consume data,No data received。when disable compression,consume work。or change consumption strategy,consumer work。


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-656239055


   @sxpsxp12 LZ4 compression is really a problem because the lz4.h/lz4.c is too old, you could also try to change compression to ZLib to watch if consume failed again.
   There's another issue that has mentioned LZ4 problem before, see [issue #6806](https://github.com/apache/pulsar/issues/6806), from the discussion you can see:
   
   > The version of LZ4 used in the current C++ client is 1.7.1 which is from back in 2015/2016 
   
   > I see that the Java client uses lz4-java 1.5.0, which is based on LZ4 1.8.3.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower closed issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
BewareMyPower closed issue #7479:
URL: https://github.com/apache/pulsar/issues/7479


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sxpsxp12 commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
sxpsxp12 commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-656068380


   producer options:
   `			Name:  "demo-producer-down",
   			Topic: topic,
   
   			CompressionType: pulsar.LZ4,
   			HashingScheme:   pulsar.Murmur3_32Hash,
   
   			MaxPendingMessages: 30000,
   
   			DisableBatching:         false,
   			BatchingMaxMessages:     1000,
   			BatchingMaxPublishDelay: 3000 * time.Millisecond,`
   
   Comsumer options:
   
   `
   			TopicsPattern:    "persistent://dataplatform/wsn/down_.*",
   			//TopicsPattern:    "persistent://public/default/demo.*",
   			SubscriptionName: "demo-mock-consumer",
   
   			SubscriptionInitialPosition: pulsar.SubscriptionPositionLatest,
   			Type:                        pulsar.Failover,
   
   			ReceiverQueueSize: 5000,
   			MessageChannel:    messageChan,`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-657690737


   It looks like a problem of broker. I have tested following Java client (commit  c94067d) with broker 2.6.0 standalone:
    
   ```java
   import org.apache.pulsar.client.api.*;
   
   import java.util.Arrays;
   import java.util.concurrent.TimeUnit;
   
   public class ProducerDemo {
       public static void main(String[] args) {
           try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
               Consumer<String> consumer = client.newConsumer(Schema.STRING) //
                       .topic("ParTopic") //
                       .subscriptionName("my-sub") //
                       .subscriptionType(SubscriptionType.Key_Shared) //
                       .subscribe();
               Producer<String> producer = client.newProducer(Schema.STRING) //
                       .compressionType(CompressionType.LZ4) //
                       .topic("ParTopic") //
                       .create();
               MessageId id = producer.newMessage().value("hello world").send();
               System.out.println("send to " + Arrays.toString(id.toByteArray()));
               producer.close();
               Message<String> msg = consumer.receive(30, TimeUnit.SECONDS);
               System.out.println("receive " + new String(msg.getData()));
               consumer.close();
           } catch (PulsarClientException e) {
               e.printStackTrace();
           }
       }
   }
   ```
   
   It stuck at `consumer.receive` until timeout exceeds. The broker's log is
   
   ```
   01:23:14.206 [bookkeeper-ml-workers-OrderedExecutor-2-0] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught 
   java.lang.IndexOutOfBoundsException: readerIndex: 77, writerIndex: -268173235 (expected: 0 <= readerIndex <= writerIndex <= capacity(4096))
   	at io.netty.buffer.AbstractByteBuf.checkIndexBounds(AbstractByteBuf.java:112) ~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
   	at io.netty.buffer.AbstractByteBuf.writerIndex(AbstractByteBuf.java:135) ~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
   	at org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1707) ~[org.apache.pulsar-pulsar-common-2.6.0.jar:2.6.0]
   	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.peekStickyKey(AbstractBaseDispatcher.java:161) ~[org.apache.pulsar-pulsar-broker-2.6.0.jar:2.6.0]
   	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:145) ~[org.apache.pulsar-pulsar-broker-2.6.0.jar:2.6.0]
   	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:476) ~[org.apache.pulsar-pulsar-broker-2.6.0.jar:2.6.0]
   	at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:152) ~[org.apache.pulsar-managed-ledger-2.6.0.jar:2.6.0]
   	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.6.0.jar:2.6.0]
   	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
   	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_251]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_251]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
   ```
   
   The worse thing is that this corrupted message (I guess) could affect following messages. After that, I changed `LZ4` to `NONE`, it still didn't work. But if I stopped the standalone, then delete `data` directory, and started the standalone again, `Key_Shared` with `Compression.NONE` worked.
   
   I didn't test with latest broker, did the latest broker solve this problem or just change some behaviour? @sijie 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-656051439


   I have added some unit tests about key shared consumer, see #7487 . `TEST_F(KeySharedConsumerTest, testMultiTopics)` is the test for regex subscription.
   
   Could you provide some detail about your code?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
sijie commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-655855699


   @sxpsxp12 Can you share a code example so that we can reproduce?
   
   Did you see any error logs at the broker side?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-656239055


   @sxpsxp12 LZ4 compression is really a problem because the lz4.h/lz4.c is too old, you could also try to switch compression to ZLib to watch if consume failed again.
   There's another issue [#6806](https://github.com/apache/pulsar/issues/6806) that has mentioned LZ4 problem before, from the discussion you can see:
   
   > The version of LZ4 used in the current C++ client is 1.7.1 which is from back in 2015/2016 
   
   > I see that the Java client uses lz4-java 1.5.0, which is based on LZ4 1.8.3.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sxpsxp12 commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
sxpsxp12 commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-656992587


   **The test results are the same with versions 2.5.2 cgo client and 2.6.0 cgo client**
   
   # Organized the following information
   ## Work 1
   1. producer config
   
   ```
   	producer, err := pulsar_client.SingletonPulsarClient().
   		CreateProducer(pulsar.ProducerOptions{
   			Name:  "demo-producer-down-cgo",
   			Topic: topic,
   
   			CompressionType: pulsar.LZ4,    //or pulsar.ZLib or pulsar.ZSTD
   			HashingScheme:   pulsar.Murmur3_32Hash,
   
   			MaxPendingMessages: 30000,
   
   			Batching:                true,
   			BatchingMaxMessages:     1000,
   			BatchingMaxPublishDelay: 3000 * time.Millisecond,
   		})
   ```
   
   2. consumer config
   
   ```
   	var err error
   	consumer, err = pulsar_client.SingletonPulsarClient().
   		CreateConsumer(pulsar.ConsumerOptions{
   			TopicsPattern:    "persistent://dataplatform/wsn/down_.*",
   			//TopicsPattern:    "persistent://public/default/demo.*",
   			SubscriptionName: "demo-mock-consumer-cgo",
   
   			SubscriptionInitPos: pulsar.Latest,
   			Type:                pulsar.Failover,
   
   			ReceiverQueueSize: 5000,
   			MessageChannel:    messageChan,
   		})
   ```
   
   ## Work 2
   
   1. producer config
   
   ```
   	producer, err := pulsar_client.SingletonPulsarClient().
   		CreateProducer(pulsar.ProducerOptions{
   			Name:  "demo-producer-down-cgo",
   			Topic: topic,
   
   			CompressionType: pulsar.ZSTD,  //or pulsar.ZLib or pulsar pulsar.LZ4
   			HashingScheme:   pulsar.Murmur3_32Hash,
   
   			MaxPendingMessages: 30000,
   
   			Batching:                false,
   			BatchingMaxMessages:     1000,
   			BatchingMaxPublishDelay: 3000 * time.Millisecond,
   		})
   ```
   
   2. consumer config
   
   ```
   	var err error
   	consumer, err = pulsar_client.SingletonPulsarClient().
   		CreateConsumer(pulsar.ConsumerOptions{
   			TopicsPattern:    "persistent://dataplatform/wsn/down_.*",
   			//TopicsPattern:    "persistent://public/default/demo.*",
   			SubscriptionName: "demo-mock-consumer-cgo",
   
   			SubscriptionInitPos: pulsar.Latest,
   			Type:                pulsar.KeyShared,   //or pulsar.Failover
   
   			ReceiverQueueSize: 5000,
   			MessageChannel:    messageChan,
   		})
   ```
   
   
   
   ## No Work1
   1. producer config
   
   ```
   	producer, err := pulsar_client.SingletonPulsarClient().
   		CreateProducer(pulsar.ProducerOptions{
   			Name:  "demo-producer-down-cgo",
   			Topic: topic,
   
   			CompressionType: pulsar.LZ4, //or pulsar.ZLib or pulsar.ZSTD
   			HashingScheme:   pulsar.Murmur3_32Hash,
   
   			MaxPendingMessages: 30000,
   
   			Batching:                true,
   			BatchingMaxMessages:     1000,
   			BatchingMaxPublishDelay: 3000 * time.Millisecond,
   		})
   ```
   2. consumer config
   
   ```
   	var err error
   	consumer, err = pulsar_client.SingletonPulsarClient().
   		CreateConsumer(pulsar.ConsumerOptions{
   			TopicsPattern:    "persistent://dataplatform/wsn/down_.*",
   			//TopicsPattern:    "persistent://public/default/demo.*",
   			SubscriptionName: "demo-mock-consumer-cgo",
   
   			SubscriptionInitPos: pulsar.Latest,
   			Type:                pulsar.KeyShared,
   
   			ReceiverQueueSize: 5000,
   			MessageChannel:    messageChan,
   		})
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sxpsxp12 commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
sxpsxp12 commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-655444381


   remarks: C++ consumer  use subscribeWithRegex() 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sxpsxp12 commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
sxpsxp12 commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-656974919


   I think the problem is not the type of compression, I switch compression to other, still does not work


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sxpsxp12 commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
sxpsxp12 commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-655400511


   pulsar broker change to 2.5.2, keyshared work ok;so, it is broker‘s problem? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-657693308


   Just now I saw the [PR #7416](https://github.com/apache/pulsar/pull/7416) that could fix this bug.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sxpsxp12 removed a comment on issue #7479: support pulsar consumer keyshared in C++ And Cgo

Posted by GitBox <gi...@apache.org>.
sxpsxp12 removed a comment on issue #7479:
URL: https://github.com/apache/pulsar/issues/7479#issuecomment-656974919


   I think the problem is not the type of compression, I switch compression to other, still does not work


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org