You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/13 02:31:17 UTC

[GitHub] [pulsar] jiangmincong opened a new issue #7805: Why use batchReceive, you can consume data for the first time, but cannot consume data afterwards; topic has been producing data

jiangmincong opened a new issue #7805:
URL: https://github.com/apache/pulsar/issues/7805


   Why use batchReceive, you can consume data for the first time, but cannot consume data afterwards; topic has been producing data
   
   code:
   Consumer<byte[]> consumer
   		/*= client.newConsumer()
   		.topics(ConfigMsg.subscribeTopicNames)
   		.subscriptionType(SubscriptionType.Shared)
   		.subscriptionName(ConfigMsg.subscriptionName)
   		.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
   		.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY)
   		.subscribe();*/
   = client.newConsumer()
   		.topics(ConfigMsg.subscribeTopicNames)
   		.subscriptionType(SubscriptionType.Shared)
   		.subscriptionName(ConfigMsg.subscriptionName)
   		.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
   		//.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY) // 默认批次消费策略
   		.batchReceivePolicy(
   				BatchReceivePolicy
   						.builder()
   						.maxNumMessages(ConfigMsg.batchReceiveMaxNumMsgs)
   						.maxNumBytes(ConfigMsg.batchReceiveMaxNumBytes)
   						.timeout(ConfigMsg.batchReceiveTimeout, TimeUnit.MILLISECONDS)
   						.build()
   		)
   		.subscribe();
   
   while (true) {
   	// Wait for a message
   	Messages messages = consumer.batchReceive();
   	System.out.println("-------------size: " + messages.size() + "------------");
   	try {
   		Iterator iterator = messages.iterator();
   		while(iterator.hasNext()){
   			Message msg = (Message) iterator.next();
   			System.out.printf("Message received: %s \n", new String(msg.getData()));
   		}
   		// Acknowledge the message so that it can be deleted by the message broker
   		consumer.acknowledge(messages);
   	} catch (Exception e) {
   		e.printStackTrace();
   		consumer.negativeAcknowledge(messages);
   	}
   }
   
   ![image](https://user-images.githubusercontent.com/22076140/90087664-e86f8900-dd4f-11ea-975a-48dae058170e.png)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on issue #7805: Why use batchReceive, you can consume data for the first time, but cannot consume data afterwards; topic has been producing data

Posted by GitBox <gi...@apache.org>.
sijie commented on issue #7805:
URL: https://github.com/apache/pulsar/issues/7805#issuecomment-673840221


   @jiangmincong Which pulsar version are you using?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiangmincong commented on issue #7805: Why use batchReceive, you can consume data for the first time, but cannot consume data afterwards; topic has been producing data

Posted by GitBox <gi...@apache.org>.
jiangmincong commented on issue #7805:
URL: https://github.com/apache/pulsar/issues/7805#issuecomment-673852014


   Version 2.5.0, I set the receiverQueueSize, no problem


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiangmincong closed issue #7805: Why use batchReceive, you can consume data for the first time, but cannot consume data afterwards; topic has been producing data

Posted by GitBox <gi...@apache.org>.
jiangmincong closed issue #7805:
URL: https://github.com/apache/pulsar/issues/7805


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org