You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/03 08:04:16 UTC

[GitHub] [pulsar] become-nice opened a new issue #7726: Consumer can't receive message from topic

become-nice opened a new issue #7726:
URL: https://github.com/apache/pulsar/issues/7726


   **Describe the bug**
   I create a topic with 4 partitions.When I use key shared mode to consumer message,sometimes, I can't receive any message from a specify partition.In this scenes,I create two consumers, one of it subscribe the topic(C1), one of it subscribe two partitions of the same topic(C2). I start C1 first, then C2. At first, C1 can receive all message,C2 can't receive any message.When I close C1, C2 can receive message from specify partitions. But when I start C1 again, all consumer can't recevie from the partitions those C2  subscribed.
   
   The code of produce
   ```python
   import pulsar
   import time
   from multiprocessing import Process
   
   
   def produce_test(partition, key, message):
       client = pulsar.Client('pulsar://localhost:6650')
       producer = client.create_producer(partition)
       print(client.get_topic_partitions("test10"))
       for i in range(240):
           time.sleep(2)
           message_mix = str(int(i)) + message
           print(message_mix)
           producer.send((message_mix).encode('utf-8'), partition_key=key)
       time.sleep(1000)
       client.close()
   
   
   if __name__ == "__main__":
       p1 = Process(target=produce_test, args=("test10", "10000", "aaaa",))
       p2 = Process(target=produce_test, args=("test10", "20000", "bbbb",))
       p3 = Process(target=produce_test, args=("test10", "30000", "cccc",))
       p4 = Process(target=produce_test, args=("test10", "40000", "dddd",))
       p1.start()
       p2.start()
       p3.start()
       p4.start()
   ```
   
   The code of C1:
   ```python
   import pulsar
   import time
   from _pulsar import ConsumerType
   from multiprocessing import Process
   
   
   def consumer_data(topic, process_index):
       client = pulsar.Client('pulsar://localhost:6650')
       print(len(client._consumers))
       consumer = client.subscribe(topic, 'my-subscription', consumer_type=ConsumerType.KeyShared)
       flag = True
       start_time = int(time.time() * 1000)
       while True:
           msg = consumer.receive()
           print(
               "Process {} Received message '{}' id='{}' partition={}".format(process_index, msg.data(), msg.message_id(),
                                                                              msg.topic_name()))
           consumer.acknowledge(msg)
   
   
   if __name__ == "__main__":
       p1 = Process(target=consumer_data, args=("test10", "p1", ))
       p1.start()
   ```
   
   The code of C2
   ```python
   import pulsar
   import time
   from _pulsar import ConsumerType
   from multiprocessing import Process
   
   
   def consumer_data(topic, process_index,):
       client = pulsar.Client('pulsar://localhost:6650')
       print(len(client._consumers))
       consumer = client.subscribe(topic, 'my-subscription', consumer_type=ConsumerType.KeyShared)
       flag = True
       start_time = int(time.time() * 1000)
       while True:
           msg = consumer.receive()
           print(
               "Process {} Received message '{}' id='{}' partition={}".format(process_index, msg.data(), msg.message_id(),
                                                                              msg.topic_name()))
           consumer.acknowledge(msg)
   
   
   if __name__ == "__main__":
       p2 = Process(target=consumer_data, args=(["test10-partition-2", "test10-partition-3"], "2", ))
       p2.start()
   ```
   
   The output of C1:
   ```
   Process p1 Received message 'b'236dddd'' id='(15,236,0,-1)' partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'236cccc'' id='(17,472,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'236aaaa'' id='(17,473,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'237dddd'' id='(15,237,0,-1)' partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'237cccc'' id='(17,474,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'237aaaa'' id='(17,475,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'238dddd'' id='(15,238,0,-1)' partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'238cccc'' id='(17,476,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'238aaaa'' id='(17,477,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'239dddd'' id='(15,239,0,-1)' partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'239cccc'' id='(17,478,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'239aaaa'' id='(17,479,1,-1)' partition=persistent://public/default/test10-partition-1
   2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-0, my-subscription, 0] , ConsumerStatsImpl (numBytesRecieved_ = 1535, totalNumBytesRecieved_ = 1535, receivedMsgMap_ = {[Key: Ok, Value: 233], }, ackedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 233], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 233], }, totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 233], })
   2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-1, my-subscription, 1] , ConsumerStatsImpl (numBytesRecieved_ = 3070, totalNumBytesRecieved_ = 3070, receivedMsgMap_ = {[Key: Ok, Value: 466], }, ackedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 466], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 466], }, totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 466], })
   2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-2, my-subscription, 2] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, totalAckedMsgMap_ = {})
   2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-3, my-subscription, 3] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, totalAckedMsgMap_ = {})
   ```
   
   The output of C2:
   ```
   2020-08-03 15:49:46.326 INFO  [140382617937728] ConnectionPool:85 | Created connection for pulsar://localhost:6650
   2020-08-03 15:49:46.326 INFO  [140382567016192] ClientConnection:335 | [127.0.0.1:50638 -> 127.0.0.1:6650] Connected to broker
   2020-08-03 15:49:46.329 INFO  [140382567016192] AckGroupingTrackerEnabled:53 | ACK grouping is enabled, grouping time 100ms, grouping max size 1000
   2020-08-03 15:49:46.329 INFO  [140382567016192] HandlerBase:53 | [persistent://public/default/test10-partition-2, my-subscription, 0] Getting connection from pool
   2020-08-03 15:49:46.329 INFO  [140382567016192] AckGroupingTrackerEnabled:53 | ACK grouping is enabled, grouping time 100ms, grouping max size 1000
   2020-08-03 15:49:46.329 INFO  [140382567016192] HandlerBase:53 | [persistent://public/default/test10-partition-3, my-subscription, 1] Getting connection from pool
   2020-08-03 15:49:46.331 INFO  [140382567016192] ConsumerImpl:199 | [persistent://public/default/test10-partition-2, my-subscription, 0] Created consumer on broker [127.0.0.1:50638 -> 127.0.0.1:6650] 
   2020-08-03 15:49:46.331 INFO  [140382567016192] ConsumerImpl:199 | [persistent://public/default/test10-partition-3, my-subscription, 1] Created consumer on broker [127.0.0.1:50638 -> 127.0.0.1:6650] 
   2020-08-03 15:49:46.331 INFO  [140382567016192] MultiTopicsConsumerImpl:99 | Successfully Subscribed to Topics
   2020-08-03 15:59:46.330 INFO  [140382567016192] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-2, my-subscription, 0] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, totalAckedMsgMap_ = {})
   2020-08-03 15:59:46.330 INFO  [140382567016192] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-3, my-subscription, 1] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, totalAckedMsgMap_ = {})
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #7726: Consumer can't receive message from topic

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #7726:
URL: https://github.com/apache/pulsar/issues/7726#issuecomment-668942839


   The point is, at the first step:
   
   > I start C1 first, then C2. 
   
   C2 could consume messages as the log says. Also I use Ctrl+C to kill the process, Even without it, the `client.close()` could never been reached because there's no way to break my loop.
   
   The full log of C2 (while C1 has started before) is:
   
   ```
   $ ./examples/SampleConsumer 2 3
   2020-08-05 10:34:23.403 INFO  [140077698222624] ConnectionPool:85 | Created connection for pulsar://localhost:6650
   2020-08-05 10:34:23.405 INFO  [140077624579840] ClientConnection:343 | [127.0.0.1:59990 -> 127.0.0.1:6650] Connected to broker
   2020-08-05 10:34:23.415 INFO  [140077624579840] HandlerBase:53 | [persistent://public/default/ParTopic-partition-3, SubscriptionName, 0] Getting connection from pool
   2020-08-05 10:34:23.417 INFO  [140077624579840] HandlerBase:53 | [persistent://public/default/ParTopic-partition-2, SubscriptionName, 1] Getting connection from pool
   2020-08-05 10:34:23.417 INFO  [140077624579840] ConsumerImpl:199 | [persistent://public/default/ParTopic-partition-3, SubscriptionName, 0] Created consumer on broker [127.0.0.1:59990 -> 127.0.0.1:6650] 
   2020-08-05 10:34:23.418 INFO  [140077624579840] ConsumerImpl:199 | [persistent://public/default/ParTopic-partition-2, SubscriptionName, 1] Created consumer on broker [127.0.0.1:59990 -> 127.0.0.1:6650] 
   2020-08-05 10:34:23.418 INFO  [140077624579840] MultiTopicsConsumerImpl:99 | Successfully Subscribed to Topics
   [2] 160bbbb, key: 20000, id: (45337,159,-1,0)
   // ...
   [2] 176bbbb, key: 20000, id: (45337,175,-1,0)
   ^C
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #7726: Consumer can't receive message from topic

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #7726:
URL: https://github.com/apache/pulsar/issues/7726#issuecomment-669319484


   Here're the outputs of Python client 2.6.0 with broker 2.6.0, using the same code as provided. First start `Producer.py` and `C1.py`, then start `C2.py`.
   
   C1:
   
   ```
   $ python3 C1.py
   0
   2020-08-05 09:55:18.379 INFO  [140237624133440] Client:88 | Subscribing on Topic :test10
   2020-08-05 09:55:18.381 INFO  [140237624133440] ConnectionPool:85 | Created connection for pulsar://localhost:6650
   2020-08-05 09:55:18.383 INFO  [140237353412352] ClientConnection:343 | [127.0.0.1:50298 -> 127.0.0.1:6650] Connected to broker
   2020-08-05 09:55:18.412 INFO  [140237353412352] HandlerBase:53 | [persistent://public/default/test10-partition-0, my-subscription, 0] Getting connection from pool
   2020-08-05 09:55:18.413 INFO  [140237353412352] HandlerBase:53 | [persistent://public/default/test10-partition-1, my-subscription, 1] Getting connection from pool
   2020-08-05 09:55:18.413 INFO  [140237353412352] HandlerBase:53 | [persistent://public/default/test10-partition-2, my-subscription, 2] Getting connection from pool
   2020-08-05 09:55:18.413 INFO  [140237353412352] HandlerBase:53 | [persistent://public/default/test10-partition-3, my-subscription, 3] Getting connection from pool
   2020-08-05 09:55:18.582 INFO  [140237353412352] ConsumerImpl:199 | [persistent://public/default/test10-partition-3, my-subscription, 3] Created consumer on broker [127.0.0.1:50298 -> 127.0.0.1:6650] 
   2020-08-05 09:55:18.582 INFO  [140237353412352] ConsumerImpl:199 | [persistent://public/default/test10-partition-1, my-subscription, 1] Created consumer on broker [127.0.0.1:50298 -> 127.0.0.1:6650] 
   2020-08-05 09:55:18.582 INFO  [140237353412352] ConsumerImpl:199 | [persistent://public/default/test10-partition-2, my-subscription, 2] Created consumer on broker [127.0.0.1:50298 -> 127.0.0.1:6650] 
   2020-08-05 09:55:18.582 INFO  [140237353412352] ConsumerImpl:199 | [persistent://public/default/test10-partition-0, my-subscription, 0] Created consumer on broker [127.0.0.1:50298 -> 127.0.0.1:6650] 
   2020-08-05 09:55:18.582 INFO  [140237353412352] PartitionedConsumerImpl:302 | Successfully Subscribed to Partitioned Topic - persistent://public/default/test10 with - 4 Partitions.
   Process p1 Received message 'b'0bbbb'' id='(14,0,3,-1)' partition=persistent://public/default/test10-partition-3
   Process p1 Received message 'b'0aaaa'' id='(15,0,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'0dddd'' id='(13,0,0,-1)' partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'0cccc'' id='(15,1,1,-1)' partition=persistent://public/default/test10-partition-1
   # messagess of 2~8...
   Process p1 Received message 'b'9dddd'' id='(13,9,0,-1)' partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'9cccc'' id='(15,18,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'9aaaa'' id='(15,19,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'10cccc'' id='(15,20,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'10dddd'' id='(13,10,0,-1)' partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'10aaaa'' id='(15,21,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'11dddd'' id='(13,11,0,-1)' partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'11cccc'' id='(15,22,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'11aaaa'' id='(15,23,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'12dddd'' id='(13,12,0,-1)' partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'12cccc'' id='(15,24,1,-1)' partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'12aaaa'' id='(15,25,1,-1)' partition=persistent://public/default/test10-partition-1
   ```
   
   C2:
   
   ```
   $ python3 C2.py 
   0
   2020-08-05 09:57:05.193 INFO  [139998113343296] ConnectionPool:85 | Created connection for pulsar://localhost:6650
   2020-08-05 09:57:05.195 INFO  [139997834229504] ClientConnection:343 | [127.0.0.1:50328 -> 127.0.0.1:6650] Connected to broker
   2020-08-05 09:57:05.206 INFO  [139997834229504] HandlerBase:53 | [persistent://public/default/test10-partition-3, my-subscription, 0] Getting connection from pool
   2020-08-05 09:57:05.210 INFO  [139997834229504] HandlerBase:53 | [persistent://public/default/test10-partition-2, my-subscription, 1] Getting connection from pool
   2020-08-05 09:57:05.215 INFO  [139997834229504] ConsumerImpl:199 | [persistent://public/default/test10-partition-3, my-subscription, 0] Created consumer on broker [127.0.0.1:50328 -> 127.0.0.1:6650] 
   2020-08-05 09:57:05.221 INFO  [139997834229504] ConsumerImpl:199 | [persistent://public/default/test10-partition-2, my-subscription, 1] Created consumer on broker [127.0.0.1:50328 -> 127.0.0.1:6650] 
   2020-08-05 09:57:05.221 INFO  [139997834229504] MultiTopicsConsumerImpl:99 | Successfully Subscribed to Topics
   Process 2 Received message 'b'9bbbb'' id='(14,9,-1,-1)' partition=persistent://public/default/test10-partition-3
   Process 2 Received message 'b'10bbbb'' id='(14,10,-1,-1)' partition=persistent://public/default/test10-partition-3
   Process 2 Received message 'b'11bbbb'' id='(14,11,-1,-1)' partition=persistent://public/default/test10-partition-3
   Process 2 Received message 'b'12bbbb'' id='(14,12,-1,-1)' partition=persistent://public/default/test10-partition-3
   ```
   
   Besides, I noticed your log has `AckGroupingTrackerEnabled`, this feature was improved from #6534 but the log level was reduced to debug from #7373 , could you try with latest client or just rollback to 2.6.0?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #7726: Consumer can't receive message from topic

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #7726:
URL: https://github.com/apache/pulsar/issues/7726#issuecomment-668380283


   I wrote a simple C++ consumer but could not reproduce the problem:
   
   ```c++
   // SampleConsumer.cc in examples directory
   #include <iostream>
   #include <pulsar/Client.h>
   
   using namespace std;
   using namespace pulsar;
   
   int main(int argc, char* argv[]) {
       std::string topic = "ParTopic";  // a topic with 4 partitions
       std::vector<std::string> partitions;
       for (int i = 1; i < argc; i++) {
           partitions.emplace_back(topic + "-partition-" + argv[i]);
       }
       size_t index = partitions.size();
   
       Client client("pulsar://localhost:6650");
   
       ConsumerConfiguration consumerConfig;
       consumerConfig.setConsumerType(ConsumerKeyShared);
   
       std::string subName = "SubscriptionName";
       Consumer consumer;
       if (partitions.empty()) {
           client.subscribe(topic, subName, consumerConfig, consumer);
       } else {
           client.subscribe(partitions, subName, consumerConfig, consumer);
       }
       Message msg;
   
       while (true) {
           Result result = consumer.receive(msg);
           cout << "[" << index << "] " << msg.getDataAsString() << ", key: " << msg.getPartitionKey()
                << ", id: " << msg.getMessageId() << endl;
           consumer.acknowledge(msg);
       }
       client.close();
   }
   ```
   
   First run a producer, and a consumer subscribing the partitioned topic (C1):
   
   ```
   $ ./SampleConsumer
   ```
   
   Then run a consumer subscribing two partitions (C2), eg, 0 and 2:
   
   ```
   $ ./SampleConsumer 2 3
   ```
   
   The log:
   
   #### partition 0 and 1
   
   C1:
   
   ```
   [0] 11bbbb, key: 20000, id: (42970,476,3,0)
   [0] 11aaaa, key: 10000, id: (43000,950,1,0)
   ```
   
   C2:
   
   ```
   [2] 11dddd, key: 40000, id: (43001,472,-1,0)
   [2] 11cccc, key: 30000, id: (43000,949,-1,0)
   ```
   
   #### partition 2 and 3
   
   C1:
   
   ```
   [0] 0cccc, key: 30000, id: (43000,955,1,0)
   [0] 0aaaa, key: 10000, id: (43000,956,1,0)
   [0] 0dddd, key: 40000, id: (43001,475,0,0)
   ```
   
   C2:
   
   ```
   [2] 0bbbb, key: 20000, id: (42970,479,-1,0)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] become-nice commented on issue #7726: Consumer can't receive message from topic

Posted by GitBox <gi...@apache.org>.
become-nice commented on issue #7726:
URL: https://github.com/apache/pulsar/issues/7726#issuecomment-669085854


   Could you attempt it in python client?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] become-nice commented on issue #7726: Consumer can't receive message from topic

Posted by GitBox <gi...@apache.org>.
become-nice commented on issue #7726:
URL: https://github.com/apache/pulsar/issues/7726#issuecomment-668933910


   You should just close the main process and don't close you client by code,just close the process.  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #7726: Consumer can't receive message from topic

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #7726:
URL: https://github.com/apache/pulsar/issues/7726#issuecomment-668380283


   I wrote a simple C++ consumer but could not reproduce the problem:
   
   ```c++
   // SampleConsumer.cc in examples directory
   #include <iostream>
   #include <pulsar/Client.h>
   
   using namespace std;
   using namespace pulsar;
   
   int main(int argc, char* argv[]) {
       std::string topic = "ParTopic";  // a topic with 4 partitions
       std::vector<std::string> partitions;
       for (int i = 1; i < argc; i++) {
           partitions.emplace_back(topic + "-partition-" + argv[i]);
       }
       size_t index = partitions.size();
   
       Client client("pulsar://localhost:6650");
   
       ConsumerConfiguration consumerConfig;
       consumerConfig.setConsumerType(ConsumerKeyShared);
   
       std::string subName = "SubscriptionName";
       Consumer consumer;
       if (partitions.empty()) {
           client.subscribe(topic, subName, consumerConfig, consumer);
       } else {
           client.subscribe(partitions, subName, consumerConfig, consumer);
       }
       Message msg;
   
       while (true) {
           Result result = consumer.receive(msg);
           cout << "[" << index << "] " << msg.getDataAsString() << ", key: " << msg.getPartitionKey()
                << ", id: " << msg.getMessageId() << endl;
           consumer.acknowledge(msg);
       }
       client.close();
   }
   ```
   
   First run a producer, and a consumer subscribing the partitioned topic (C1):
   
   ```
   $ ./SampleConsumer
   ```
   
   Then run a consumer subscribing two partitions (C2), eg, 0 and 2:
   
   ```
   $ ./SampleConsumer 2 3
   ```
   
   The example log of two cases are:
   
   #### partition 0 and 1
   
   C1:
   
   ```
   [0] 11bbbb, key: 20000, id: (42970,476,3,0)
   [0] 11aaaa, key: 10000, id: (43000,950,1,0)
   ```
   
   C2:
   
   ```
   [2] 11dddd, key: 40000, id: (43001,472,-1,0)
   [2] 11cccc, key: 30000, id: (43000,949,-1,0)
   ```
   
   #### partition 2 and 3
   
   C1:
   
   ```
   [0] 0cccc, key: 30000, id: (43000,955,1,0)
   [0] 0aaaa, key: 10000, id: (43000,956,1,0)
   [0] 0dddd, key: 40000, id: (43001,475,0,0)
   ```
   
   C2:
   
   ```
   [2] 0bbbb, key: 20000, id: (42970,479,-1,0)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org