You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/21 15:45:48 UTC

[GitHub] sijie closed pull request #2585: [tests] Make BrokerClientIntegrationTest testing behavior deterministic

sijie closed pull request #2585: [tests] Make BrokerClientIntegrationTest testing behavior deterministic
URL: https://github.com/apache/incubator-pulsar/pull/2585
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 0e47dfe20b..314ca699c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -54,6 +54,7 @@
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -292,16 +293,18 @@ public void testCloseBrokerService() throws Exception {
     public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final int batchMessageDelayMs = 1000;
         final String topicName = "persistent://my-property/my-ns/my-topic1";
         final String subscriptionName = "my-subscriber-name" + subType;
 
         ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName(subscriptionName).subscriptionType(subType).subscribe();
 
+        final int numMessagesPerBatch = 10;
+
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName).enableBatching(true)
-                .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS).batchingMaxMessages(20).create();
+                .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.SECONDS)
+                .batchingMaxMessages(numMessagesPerBatch).create();
 
         // update consumer's version to incompatible batch-message version = Version.V3
         Topic topic = pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -315,13 +318,13 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
         versionField.set(cnx, 3);
 
         // (1) send non-batch message: consumer should be able to consume
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < numMessagesPerBatch; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
         Set<String> messageSet = Sets.newHashSet();
         Message<byte[]> msg = null;
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < numMessagesPerBatch; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
             String expectedMessage = "my-message-" + i;
@@ -333,12 +336,11 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
         // verification
         consumer1.setClientCnx(null);
         // (2) send batch-message which should not be able to consume: as broker will disconnect the consumer
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < numMessagesPerBatch; i++) {
             String message = "my-message-" + i;
             batchProducer.sendAsync(message.getBytes());
         }
-
-        Thread.sleep(batchMessageDelayMs);
+        batchProducer.flush();
 
         // consumer should have not received any message as it should have been disconnected
         msg = consumer1.receive(2, TimeUnit.SECONDS);
@@ -349,7 +351,7 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
                 .subscribe();
 
         messageSet.clear();
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < numMessagesPerBatch; i++) {
             msg = consumer2.receive(1, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
             log.debug("Received message: [{}]", receivedMessage);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services