You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/06/03 10:21:20 UTC

[GitHub] [pulsar] wolfstudy opened a new issue #4444: [cgo client] Shared subscribe type error

wolfstudy opened a new issue #4444: [cgo client] Shared subscribe type error
URL: https://github.com/apache/pulsar/issues/4444
 
 
   **Describe the bug**
   
   go test:
   
   ```
   func TestConsumerShared(t *testing.T) {
   	client, err := NewClient(ClientOptions{
   		URL: "pulsar://localhost:6650",
   	})
   	assert.Nil(t, err)
   	defer client.Close()
   
   	topic := "persistent://public/default/multi-topic-1"
   
   	consumer1, err := client.Subscribe(ConsumerOptions{
   		Topic:            topic,
   		SubscriptionName: "key-sub3",
   		Type:             Shared,
   	})
   	assert.Nil(t, err)
   	defer consumer1.Close()
   
   	consumer2, err := client.Subscribe(ConsumerOptions{
   		Topic:            topic,
   		SubscriptionName: "key-sub3",
   		Type:             Shared,
   	})
   	assert.Nil(t, err)
   	defer consumer2.Close()
   
   	// create producer
   	producer, err := client.CreateProducerWithSchema(ProducerOptions{
   		Topic:    topic,
   		Batching: false,
   	}, NewBytesSchema(nil))
   	assert.Nil(t, err)
   	defer producer.Close()
   
   	ctx := context.Background()
   	for i := 0; i < 20; i++ {
   		producer.SendAsync(ctx, ProducerMessage{
   			Key:   fmt.Sprintf("key-%d", i%3),
   			Value: []byte(fmt.Sprintf("value-%d", i)),
   		}, func(producerMessage ProducerMessage, e error) {
   			fmt.Printf("send complete. err = %v \n", e)
   		})
   	}
   
   	for i := 0; i < 10; i++ {
   		msg, err := consumer1.Receive(ctx)
   		fmt.Printf("consumer1 key is: %s, value is: %s\n", msg.Key(), string(msg.Payload()))
   		assert.Nil(t, err)
   		err = consumer1.Ack(msg)
   		assert.Nil(t, err)
   	}
   
   	for i := 0; i < 10; i++ {
   		msg2, err := consumer2.Receive(ctx)
   		assert.Nil(t, err)
   		fmt.Printf("consumer2 key is:%s, value is: %s\n", msg2.Key(), string(msg2.Payload()))
   		err = consumer2.Ack(msg2)
   		assert.Nil(t, err)
   	}
   }
   ```
   
   cpp test:
   
   ```
   TEST(ConsumerConfigurationTest, testSubscribePersistentMultiConsumerKeyShared) {
       std::string lookupUrl = "pulsar://localhost:6650";
       std::string topicName = "persist-key-shared-topic-1";
       std::string subName = "test-persist-key-shared";
   
       ClientConfiguration clientConfig;
       Client client(lookupUrl, clientConfig);
   
       Result result;
   
       LOG_INFO("create 1 producer...");
       Producer producer;
       ProducerConfiguration conf;
       result = client.createProducer(topicName, conf, producer);
       ASSERT_EQ(ResultOk, result);
   
       std::string content1 = "msg-1-content-1";
       Message msg = MessageBuilder().setContent(content1).build();
       result = producer.send(msg);
       ASSERT_EQ(ResultOk, result);
   
       std::string content2 = "msg-2-content-2";
       msg = MessageBuilder().setContent(content2).build();
       result = producer.send(msg);
       ASSERT_EQ(ResultOk, result);
   
       ConsumerConfiguration config;
       config.setReadCompacted(false);
       config.setConsumerType(ConsumerShared);
       config.setSubscriptionInitialPosition(InitialPosition::InitialPositionEarliest);
   
       Consumer consumer;
       result = client.subscribe(topicName, subName, config, consumer);
       ASSERT_EQ(ResultOk, result);
       Consumer consumer1;
       result = client.subscribe(topicName, subName, config, consumer1);
       ASSERT_EQ(ResultOk, result);
   
       Message receivedMsg;
   
       result = consumer.receive(receivedMsg, 2000);
       ASSERT_EQ(ResultOk, result);
       ASSERT_EQ(content1, receivedMsg.getDataAsString());
   
       result = consumer1.receive(receivedMsg, 2000);
       ASSERT_EQ(ResultOk, result);
       ASSERT_EQ(content2, receivedMsg.getDataAsString());
   
       consumer.close();
       consumer1.close();
   }
   ```
   
   result output `timeout`, as follows:
   
   ```
   2019-06-03 09:54:21.142 INFO  ConsumerImpl:170 | [persistent://public/default/persist-key-shared-topic-1, test-persist-key-shared, 1] Created consumer on broker [127.0.0.1:50264 -> 127.0.0.1:6650]
   /pulsar/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc:138: Failure
   Value of: result
     Actual: TimeOut
   Expected: ResultOk
   Which is: Ok
   2019-06-03 09:54:23.171 WARN  ConsumerImpl:98 | [persistent://public/default/persist-key-shared-topic-1, test-persist-key-shared, 1] Destroyed consumer which was not properly closed
   2019-06-03 09:54:23.171 INFO  ConsumerImpl:839 | [persistent://public/default/persist-key-shared-topic-1, test-persist-key-shared, 1] Closing consumer for topic persistent://public/default/persist-key-shared-topic-1
   2019-06-03 09:54:23.172 WARN  ConsumerImpl:98 | [persistent://public/default/persist-key-shared-topic-1, test-persist-key-shared, 0] Destroyed consumer which was not properly closed
   2019-06-03 09:54:23.172 INFO  ConsumerImpl:839 | [persistent://public/default/persist-key-shared-topic-1, test-persist-key-shared, 0] Closing consumer for topic persistent://public/default/persist-key-shared-topic-1
   [  FAILED  ] ConsumerConfigurationTest.testSubscribePersistentMultiConsumerKeyShared (3899 ms)
   [----------] 1 test from ConsumerConfigurationTest (3899 ms total)
   
   [----------] Global test environment tear-down
   [==========] 1 test from 1 test case ran. (3900 ms total)
   [  PASSED  ] 0 tests.
   [  FAILED  ] 1 test, listed below:
   [  FAILED  ] ConsumerConfigurationTest.testSubscribePersistentMultiConsumerKeyShared
   
    1 FAILED TEST
   [1/1] ConsumerConfigurationTest.testSubscribePersistentMultiConsumerKeyShared returned/aborted with exit code 1 (3946 ms)
   FAILED TESTS (1/1):
       3946 ms: ./main ConsumerConfigurationTest.testSubscribePersistentMultiConsumerKeyShared
   ```
   
   **To Reproduce**
   
   copy the code above, and run it.
   
   **Expected behavior**
   
   In go test code, `consumer1` and `consumer2` each receive 10 messages. In fact, only `consumer1` receives 10 messages, and then remains blocked, and `consumer2` does not receive any messages.
   
   In cpp test code, `consumer` and `consumer1` each receive 1 messages. In fact, only `consumer` receives 1 message, and then `consumer1` receives timeout.
   
   In my test scenario, cpp and go behave in the same way, so I guess this may be a bug in cpp. For the same topic, start multiple consumers to share with the shared subscription type, only one consumer can receive To data.
   
   ```
   $ ./bin/pulsar-admin topics stats [topic-name]
   ```
   
   ![image](https://user-images.githubusercontent.com/20965307/58794973-17f31380-862c-11e9-9925-c9a23eade7c3.png)
   
   Using the `stats` command, you can see that all messages are sent to the same consumer under the shared subscription type.
   
   
   **Desktop (please complete the following information):**
    - OS: Mac ios
    - go version: 1.11.4
    - pulsar version: latest
   
   **Additional context**
   Add any other context about the problem here.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services