You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/09/11 21:19:34 UTC

[pulsar] branch master updated: Fixed spark receiver to account for all the consumer config options (#5152)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b1921a  Fixed spark receiver to account for all the consumer config options (#5152)
6b1921a is described below

commit 6b1921aef3a7d0be6e1d63fcc32f37333ad569de
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Sep 11 14:19:27 2019 -0700

    Fixed spark receiver to account for all the consumer config options (#5152)
---
 .../pulsar/spark/SparkStreamingPulsarReceiver.java | 22 ++++------
 .../spark/SparkStreamingPulsarReceiverTest.java    | 50 +++++++++++++++++++++-
 2 files changed, 57 insertions(+), 15 deletions(-)

diff --git a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
index 8e124ed..833d42e 100644
--- a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
+++ b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
@@ -18,23 +18,23 @@
  */
 package org.apache.pulsar.spark;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.Serializable;
-import java.util.Set;
 
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.receiver.Receiver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
@@ -66,16 +66,14 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
         this.serviceUrl = serviceUrl;
         this.authentication = authentication;
 
-        if (conf.getAckTimeoutMillis() == 0) {
-            conf.setAckTimeoutMillis(60000);
-        }
         if (conf.getMessageListener() == null) {
-            conf.setMessageListener((MessageListener & Serializable) (consumer, msg) -> {
+            conf.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
                 try {
                     store(msg.getData());
                     consumer.acknowledgeAsync(msg);
                 } catch (Exception e) {
                     LOG.error("Failed to store a message : {}", e.getMessage());
+                    consumer.negativeAcknowledge(msg);
                 }
             });
         }
@@ -84,13 +82,9 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
 
     public void onStart() {
         try {
-            Set<String> topicNames = conf.getTopicNames();
-            String[] topicNamesArray = new String[topicNames.size()];
-            topicNames.toArray(topicNamesArray);
             pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
-            consumer = pulsarClient.newConsumer().topic(topicNamesArray).subscriptionName(conf.getSubscriptionName())
-                .messageListener(this.conf.getMessageListener()).subscribe();
-        } catch (PulsarClientException e) {
+            consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(conf).join();
+        } catch (Exception e) {
             LOG.error("Failed to start subscription : {}", e.getMessage());
             restart("Restart a consumer");
         }
diff --git a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
index 7504948..bd619a5 100644
--- a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
+++ b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
@@ -23,13 +23,18 @@ import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
@@ -94,10 +99,53 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
             new AuthenticationDisabled());
 
         assertEquals(receiver.storageLevel(), StorageLevel.MEMORY_AND_DISK_2());
-        assertEquals(consConf.getAckTimeoutMillis(), 60_000);
         assertNotNull(consConf.getMessageListener());
     }
 
+    @Test(dataProvider = "ServiceUrls")
+    public void testSharedSubscription(String serviceUrl) throws Exception {
+        ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
+
+        Set<String> set = new HashSet<>();
+        set.add(TOPIC);
+        consConf.setTopicNames(set);
+        consConf.setSubscriptionName(SUBS);
+        consConf.setSubscriptionType(SubscriptionType.Shared);
+        consConf.setReceiverQueueSize(1);
+
+        Map<String, MutableInt> receveidCounts = new HashMap<>();
+
+        consConf.setMessageListener((consumer, msg) -> {
+            receveidCounts.computeIfAbsent(consumer.getConsumerName(), x -> new MutableInt(0)).increment();
+        });
+
+        SparkStreamingPulsarReceiver receiver1 = new SparkStreamingPulsarReceiver(
+            serviceUrl,
+            consConf,
+            new AuthenticationDisabled());
+
+        SparkStreamingPulsarReceiver receiver2 = new SparkStreamingPulsarReceiver(
+                serviceUrl,
+                consConf,
+                new AuthenticationDisabled());
+
+        receiver1.onStart();
+        receiver2.onStart();
+        waitForTransmission();
+
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+        Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
+        for (int i = 0; i < 10; i++) {
+            producer.send(EXPECTED_MESSAGE.getBytes());
+        }
+
+        waitForTransmission();
+        receiver1.onStop();
+        receiver2.onStop();
+
+        assertEquals(receveidCounts.size(), 2);
+    }
+
     @Test(expectedExceptions = NullPointerException.class,
             expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
             dataProvider = "ServiceUrls")