You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/03 05:18:19 UTC

[GitHub] sijie closed pull request #2648: [tests] create consumers before producing for DeadLetterTopicTest

sijie closed pull request #2648: [tests] create consumers before producing for DeadLetterTopicTest
URL: https://github.com/apache/pulsar/pull/2648
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index a0a286be21..2ff500ba36 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -53,18 +53,6 @@ public void testDeadLetterTopic() throws Exception {
 
         final int sendMessages = 100;
 
-        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
-                .topic(topic)
-                .create();
-
-        for (int i = 0; i < sendMessages; i++) {
-            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
-        }
-
-        producer.close();
-
-
-
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic)
                 .subscriptionName("my-subscription")
@@ -78,8 +66,19 @@ public void testDeadLetterTopic() throws Exception {
         Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
 
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        producer.close();
+
         int totalReceived = 0;
         do {
             Message<byte[]> message = consumer.receive();
@@ -127,6 +126,24 @@ public void testDeadLetterTopicWithMultiTopic() throws Exception {
 
         int sendMessages = 100;
 
+        // subscribe to the original topics before publish
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic1, topic2)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        // subscribe to the DLQ topics before consuming original topics
+        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ", "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
         Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
                 .topic(topic1)
                 .create();
@@ -145,26 +162,11 @@ public void testDeadLetterTopicWithMultiTopic() throws Exception {
         producer1.close();
         producer2.close();
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
-                .topic(topic1, topic2)
-                .subscriptionName("my-subscription")
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(3, TimeUnit.SECONDS)
-                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
-                .receiverQueueSize(100)
-                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-                .subscribe();
-
-        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
-                .topic("persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ", "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ")
-                .subscriptionName("my-subscription")
-                .subscribe();
-
         int totalReceived = 0;
         do {
             Message<byte[]> message = consumer.receive();
-            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
-            totalReceived++;
+            log.info("consumer received message : {} {} - total = {}",
+                message.getMessageId(), new String(message.getData()), ++totalReceived);
         } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
 
         int totalInDeadLetter = 0;
@@ -199,13 +201,8 @@ public void testDeadLetterTopicByCustomTopicName() throws Exception {
         final String topic = "persistent://my-property/my-ns/dead-letter-topic";
         final int maxRedeliveryCount = 2;
         final int sendMessages = 100;
-        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
-                .topic(topic)
-                .create();
-        for (int i = 0; i < sendMessages; i++) {
-            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
-        }
-        producer.close();
+
+        // subscribe before publish
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic)
                 .subscriptionName("my-subscription")
@@ -222,6 +219,15 @@ public void testDeadLetterTopicByCustomTopicName() throws Exception {
                 .topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
                 .subscriptionName("my-subscription")
                 .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+
         int totalReceived = 0;
         do {
             Message<byte[]> message = consumer.receive();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services