You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/09/11 21:19:34 UTC
[pulsar] branch master updated: Fixed spark receiver to account for
all the consumer config options (#5152)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6b1921a Fixed spark receiver to account for all the consumer config options (#5152)
6b1921a is described below
commit 6b1921aef3a7d0be6e1d63fcc32f37333ad569de
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Sep 11 14:19:27 2019 -0700
Fixed spark receiver to account for all the consumer config options (#5152)
---
.../pulsar/spark/SparkStreamingPulsarReceiver.java | 22 ++++------
.../spark/SparkStreamingPulsarReceiverTest.java | 50 +++++++++++++++++++++-
2 files changed, 57 insertions(+), 15 deletions(-)
diff --git a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
index 8e124ed..833d42e 100644
--- a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
+++ b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
@@ -18,23 +18,23 @@
*/
package org.apache.pulsar.spark;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.io.Serializable;
-import java.util.Set;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
@@ -66,16 +66,14 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
this.serviceUrl = serviceUrl;
this.authentication = authentication;
- if (conf.getAckTimeoutMillis() == 0) {
- conf.setAckTimeoutMillis(60000);
- }
if (conf.getMessageListener() == null) {
- conf.setMessageListener((MessageListener & Serializable) (consumer, msg) -> {
+ conf.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
try {
store(msg.getData());
consumer.acknowledgeAsync(msg);
} catch (Exception e) {
LOG.error("Failed to store a message : {}", e.getMessage());
+ consumer.negativeAcknowledge(msg);
}
});
}
@@ -84,13 +82,9 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
public void onStart() {
try {
- Set<String> topicNames = conf.getTopicNames();
- String[] topicNamesArray = new String[topicNames.size()];
- topicNames.toArray(topicNamesArray);
pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
- consumer = pulsarClient.newConsumer().topic(topicNamesArray).subscriptionName(conf.getSubscriptionName())
- .messageListener(this.conf.getMessageListener()).subscribe();
- } catch (PulsarClientException e) {
+ consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(conf).join();
+ } catch (Exception e) {
LOG.error("Failed to start subscription : {}", e.getMessage());
restart("Restart a consumer");
}
diff --git a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
index 7504948..bd619a5 100644
--- a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
+++ b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
@@ -23,13 +23,18 @@ import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
@@ -94,10 +99,53 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
new AuthenticationDisabled());
assertEquals(receiver.storageLevel(), StorageLevel.MEMORY_AND_DISK_2());
- assertEquals(consConf.getAckTimeoutMillis(), 60_000);
assertNotNull(consConf.getMessageListener());
}
+ @Test(dataProvider = "ServiceUrls")
+ public void testSharedSubscription(String serviceUrl) throws Exception {
+ ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
+
+ Set<String> set = new HashSet<>();
+ set.add(TOPIC);
+ consConf.setTopicNames(set);
+ consConf.setSubscriptionName(SUBS);
+ consConf.setSubscriptionType(SubscriptionType.Shared);
+ consConf.setReceiverQueueSize(1);
+
+ Map<String, MutableInt> receveidCounts = new HashMap<>();
+
+ consConf.setMessageListener((consumer, msg) -> {
+ receveidCounts.computeIfAbsent(consumer.getConsumerName(), x -> new MutableInt(0)).increment();
+ });
+
+ SparkStreamingPulsarReceiver receiver1 = new SparkStreamingPulsarReceiver(
+ serviceUrl,
+ consConf,
+ new AuthenticationDisabled());
+
+ SparkStreamingPulsarReceiver receiver2 = new SparkStreamingPulsarReceiver(
+ serviceUrl,
+ consConf,
+ new AuthenticationDisabled());
+
+ receiver1.onStart();
+ receiver2.onStart();
+ waitForTransmission();
+
+ PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+ Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
+ for (int i = 0; i < 10; i++) {
+ producer.send(EXPECTED_MESSAGE.getBytes());
+ }
+
+ waitForTransmission();
+ receiver1.onStop();
+ receiver2.onStop();
+
+ assertEquals(receveidCounts.size(), 2);
+ }
+
@Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
dataProvider = "ServiceUrls")