You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ay...@apache.org on 2023/03/29 23:09:40 UTC
[pulsar-adapters] branch master updated: Strom adapter moved to Apache Storm community (#47)
This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push:
new ab537c1 Strom adapter moved to Apache Storm community (#47)
ab537c1 is described below
commit ab537c13061b06de2044cd43965de930d762fe8a
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Mar 29 16:09:33 2023 -0700
Strom adapter moved to Apache Storm community (#47)
---
.asf.yaml | 3 +-
README.md | 4 +-
pom.xml | 19 -
pulsar-storm/pom.xml | 102 -----
.../apache/pulsar/storm/MessageToValuesMapper.java | 44 --
.../java/org/apache/pulsar/storm/PulsarBolt.java | 209 ---------
.../pulsar/storm/PulsarBoltConfiguration.java | 57 ---
.../java/org/apache/pulsar/storm/PulsarSpout.java | 485 ---------------------
.../pulsar/storm/PulsarSpoutConfiguration.java | 195 ---------
.../apache/pulsar/storm/PulsarSpoutConsumer.java | 58 ---
.../pulsar/storm/PulsarStormConfiguration.java | 90 ----
.../java/org/apache/pulsar/storm/PulsarTuple.java | 45 --
.../apache/pulsar/storm/SharedPulsarClient.java | 153 -------
.../apache/pulsar/storm/TupleToMessageMapper.java | 66 ---
pulsar-storm/src/main/javadoc/overview.html | 29 --
.../org/apache/pulsar/storm/PulsarSpoutTest.java | 178 --------
tests/pom.xml | 1 -
tests/pulsar-storm-test/pom.xml | 131 ------
.../apache/pulsar/storm/MockOutputCollector.java | 101 -----
.../pulsar/storm/MockSpoutOutputCollector.java | 80 ----
.../org/apache/pulsar/storm/PulsarBoltTest.java | 236 ----------
.../org/apache/pulsar/storm/PulsarSpoutTest.java | 349 ---------------
.../java/org/apache/pulsar/storm/TestUtil.java | 35 --
.../apache/pulsar/storm/example/StormExample.java | 166 -------
24 files changed, 4 insertions(+), 2832 deletions(-)
diff --git a/.asf.yaml b/.asf.yaml
index 74589a2..c04e151 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -27,7 +27,6 @@ github:
- streaming
- queuing
- event-streaming
- - apache-storm
- apache-spark
- apache-kafka
features:
@@ -48,4 +47,4 @@ github:
notifications:
commits: commits@pulsar.apache.org
issues: commits@pulsar.apache.org
- pullrequests: commits@pulsar.apache.org
\ No newline at end of file
+ pullrequests: commits@pulsar.apache.org
diff --git a/README.md b/README.md
index ce62338..33409df 100644
--- a/README.md
+++ b/README.md
@@ -25,6 +25,8 @@ This repository is used for hosting all the adapters maintained and supported by
[Apache Flink adapter](https://github.com/apache/flink-connector-pulsar) is supported and maintained by Apache Flink Community.
+[Apache Storm bolt and spout](https://github.com/apache/storm/tree/master/external/storm-pulsar) are supported by Apache Storm Community.
+
## Building
In order to build this code you can simply use Maven
@@ -42,5 +44,5 @@ git checkout v2.11.0
mvn clean install -DskipTests
```
-This is because this repository depends on test integration artifacts of the relative version on the main
+This is because this repository depends on test integration artifacts of the relative version on the main
Apache Pulsar codebase
diff --git a/pom.xml b/pom.xml
index 249585f..3527230 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,6 @@
<properties>
<pulsar.version>2.11.0</pulsar.version>
- <storm.version>2.0.0</storm.version>
<kafka-client.version>2.7.0</kafka-client.version>
<kafka_0_8.version>0.8.1.1</kafka_0_8.version>
<avro.version>1.10.2</avro.version>
@@ -139,7 +138,6 @@
</properties>
<modules>
- <module>pulsar-storm</module>
<module>pulsar-spark</module>
<module>pulsar-client-kafka-compat</module>
<module>pulsar-log4j2-appender</module>
@@ -251,22 +249,6 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-client</artifactId>
- <version>${storm.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
- <version>${storm.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${storm.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
@@ -1028,4 +1010,3 @@
</repositories>
</project>
-
diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml
deleted file mode 100644
index a20649b..0000000
--- a/pulsar-storm/pom.xml
+++ /dev/null
@@ -1,102 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-adapters</artifactId>
- <version>2.11.0-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>pulsar-storm</artifactId>
- <name>Pulsar Storm adapter</name>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-common</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.testng</groupId>
- <artifactId>testng</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.yaml</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-client</artifactId>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- <filtering>true</filtering>
- </resource>
- </resources>
- </build>
-</project>
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
deleted file mode 100644
index 92e127c..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.io.Serializable;
-
-import org.apache.pulsar.client.api.Message;
-
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-
-public interface MessageToValuesMapper extends Serializable {
-
- /**
- * Convert {@link org.apache.pulsar.client.api.Message} to tuple values.
- *
- * @param msg
- * @return
- */
- Values toValues(Message<byte[]> msg);
-
- /**
- * Declare the output schema for the spout.
- *
- * @param declarer
- */
- void declareOutputFields(OutputFieldsDeclarer declarer);
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
deleted file mode 100644
index d331ca2..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static java.lang.String.format;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class PulsarBolt extends BaseRichBolt implements IMetric {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(PulsarBolt.class);
-
- public static final String NO_OF_MESSAGES_SENT = "numberOfMessagesSent";
- public static final String PRODUCER_RATE = "producerRate";
- public static final String PRODUCER_THROUGHPUT_BYTES = "producerThroughput";
-
- private final ClientConfigurationData clientConf;
- private final ProducerConfigurationData producerConf;
- private final PulsarBoltConfiguration pulsarBoltConf;
- private final ConcurrentMap<String, Object> metricsMap = new ConcurrentHashMap<>();
-
- private SharedPulsarClient sharedPulsarClient;
- private String componentId;
- private String boltId;
- private OutputCollector collector;
- private Producer<byte[]> producer;
- private volatile long messagesSent = 0;
- private volatile long messageSizeSent = 0;
-
- public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf) {
- this(pulsarBoltConf, PulsarClient.builder());
- }
-
- public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) {
- this(pulsarBoltConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(),
- new ProducerConfigurationData());
- }
-
- public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfigurationData clientConf,
- ProducerConfigurationData producerConf) {
- checkNotNull(pulsarBoltConf, "bolt configuration can't be null");
- checkNotNull(clientConf, "client configuration can't be null");
- checkNotNull(producerConf, "producer configuration can't be null");
- Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
- Objects.requireNonNull(pulsarBoltConf.getTopic());
- Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
- this.pulsarBoltConf = pulsarBoltConf;
- this.clientConf = clientConf;
- this.producerConf = producerConf;
- this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
- this.producerConf.setTopicName(pulsarBoltConf.getTopic());
- this.producerConf.setBatcherBuilder(null);
- }
-
- @SuppressWarnings({ "rawtypes" })
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- this.componentId = context.getThisComponentId();
- this.boltId = String.format("%s-%s", componentId, context.getThisTaskId());
- this.collector = collector;
- try {
- sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
- producer = sharedPulsarClient.getSharedProducer(producerConf);
- LOG.info("[{}] Created a pulsar producer on topic {} to send messages", boltId, pulsarBoltConf.getTopic());
- } catch (PulsarClientException e) {
- LOG.error("[{}] Error initializing pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e);
- throw new IllegalStateException(
- format("Failed to initialize producer for %s : %s", pulsarBoltConf.getTopic(), e.getMessage()), e);
- }
- context.registerMetric(String.format("PulsarBoltMetrics-%s-%s", componentId, context.getThisTaskIndex()), this,
- pulsarBoltConf.getMetricsTimeIntervalInSecs());
- }
-
- @Override
- public void execute(Tuple input) {
- if (TupleUtils.isTick(input)) {
- collector.ack(input);
- return;
- }
- try {
- if (producer != null) {
- // a message key can be provided in the mapper
- TypedMessageBuilder<byte[]> msgBuilder = pulsarBoltConf.getTupleToMessageMapper()
- .toMessage(producer.newMessage(), input);
- if (msgBuilder == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Cannot send null message, acking the collector", boltId);
- }
- collector.ack(input);
- } else {
- final long messageSizeToBeSent = ((TypedMessageBuilderImpl<byte[]>) msgBuilder).getContent()
- .remaining();
- msgBuilder.sendAsync().handle((msgId, ex) -> {
- synchronized (collector) {
- if (ex != null) {
- collector.reportError(ex);
- collector.fail(input);
- LOG.error("[{}] Message send failed", boltId, ex);
-
- } else {
- collector.ack(input);
- ++messagesSent;
- messageSizeSent += messageSizeToBeSent;
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Message sent with id {}", boltId, msgId);
- }
- }
- }
-
- return null;
- });
- }
- }
- } catch (Exception e) {
- LOG.error("[{}] Message processing failed", boltId, e);
- collector.reportError(e);
- collector.fail(input);
- }
- }
-
- public void close() {
- try {
- LOG.info("[{}] Closing Pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic());
- if (sharedPulsarClient != null) {
- sharedPulsarClient.close();
- }
- } catch (PulsarClientException e) {
- LOG.error("[{}] Error closing Pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e);
- }
- }
-
- @Override
- public void cleanup() {
- close();
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(declarer);
- }
-
- /**
- * Helpers for metrics
- */
-
- @SuppressWarnings({ "rawtypes" })
- ConcurrentMap getMetrics() {
- metricsMap.put(NO_OF_MESSAGES_SENT, messagesSent);
- metricsMap.put(PRODUCER_RATE, ((double) messagesSent) / pulsarBoltConf.getMetricsTimeIntervalInSecs());
- metricsMap.put(PRODUCER_THROUGHPUT_BYTES,
- ((double) messageSizeSent) / pulsarBoltConf.getMetricsTimeIntervalInSecs());
- return metricsMap;
- }
-
- void resetMetrics() {
- messagesSent = 0;
- messageSizeSent = 0;
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public Object getValueAndReset() {
- ConcurrentMap metrics = getMetrics();
- resetMetrics();
- return metrics;
- }
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
deleted file mode 100644
index 714e435..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.Objects;
-
-/**
- * Class used to specify Pulsar bolt configuration
- *
- *
- */
-public class PulsarBoltConfiguration extends PulsarStormConfiguration {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- private TupleToMessageMapper tupleToMessageMapper = null;
-
- /**
- * @return the mapper to convert storm tuples to a pulsar message
- */
- public TupleToMessageMapper getTupleToMessageMapper() {
- return tupleToMessageMapper;
- }
-
- /**
- * Sets the mapper to convert storm tuples to a pulsar message
- * <p>
- * Note: If the mapper returns null, the message is not sent by the producer and is acked immediately on the
- * collector
- * </p>
- *
- * @param mapper
- */
- public void setTupleToMessageMapper(TupleToMessageMapper mapper) {
- this.tupleToMessageMapper = Objects.requireNonNull(mapper);
- }
-
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
deleted file mode 100644
index 63887a4..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static java.lang.String.format;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.impl.Backoff;
-import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PulsarSpout extends BaseRichSpout implements IMetric {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class);
-
- public static final String NO_OF_PENDING_FAILED_MESSAGES = "numberOfPendingFailedMessages";
- public static final String NO_OF_MESSAGES_RECEIVED = "numberOfMessagesReceived";
- public static final String NO_OF_MESSAGES_EMITTED = "numberOfMessagesEmitted";
- public static final String NO_OF_MESSAGES_FAILED = "numberOfMessagesFailed";
- public static final String MESSAGE_NOT_AVAILABLE_COUNT = "messageNotAvailableCount";
- public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
- public static final String CONSUMER_RATE = "consumerRate";
- public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
-
- private final ClientConfigurationData clientConf;
- private final PulsarSpoutConfiguration pulsarSpoutConf;
- private final ConsumerConfigurationData<byte[]> consumerConf;
- private final long failedRetriesTimeoutNano;
- private final int maxFailedRetries;
- private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = new ConcurrentHashMap<>();
- private final Queue<Message<byte[]>> failedMessages = new ConcurrentLinkedQueue<>();
- private final ConcurrentMap<String, Object> metricsMap = new ConcurrentHashMap<>();
-
- private SharedPulsarClient sharedPulsarClient;
- private String componentId;
- private String spoutId;
- private SpoutOutputCollector collector;
- private PulsarSpoutConsumer consumer;
- private volatile long messagesReceived = 0;
- private volatile long messagesEmitted = 0;
- private volatile long messagesFailed = 0;
- private volatile long messageNotAvailableCount = 0;
- private volatile long pendingAcks = 0;
- private volatile long messageSizeReceived = 0;
-
- public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf) {
- this(pulsarSpoutConf, PulsarClient.builder());
- }
-
- public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clientBuilder) {
- this(pulsarSpoutConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(),
- new ConsumerConfigurationData<byte[]>());
- }
-
- public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfigurationData clientConfig,
- ConsumerConfigurationData<byte[]> consumerConfig) {
- Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
- Objects.requireNonNull(pulsarSpoutConf.getTopic());
- Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
- Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
-
- checkNotNull(pulsarSpoutConf, "spout configuration can't be null");
- checkNotNull(clientConfig, "client configuration can't be null");
- checkNotNull(consumerConfig, "consumer configuration can't be null");
- this.clientConf = clientConfig;
- this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
- this.consumerConf = consumerConfig;
- this.pulsarSpoutConf = pulsarSpoutConf;
- this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
- this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
- }
-
- @Override
- public void close() {
- try {
- LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic());
-
- if (pulsarSpoutConf.isAutoUnsubscribe()) {
- try {
- consumer.unsubscribe();
- }catch(PulsarClientException e) {
- LOG.error("[{}] Failed to unsubscribe {} on topic {}", spoutId,
- this.pulsarSpoutConf.getSubscriptionName(), pulsarSpoutConf.getTopic(), e);
- }
- }
-
- if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != null) {
- consumer.close();
- }
- if (sharedPulsarClient != null) {
- sharedPulsarClient.close();
- }
- pendingMessageRetries.clear();
- failedMessages.clear();
- } catch (PulsarClientException e) {
- LOG.error("[{}] Error closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic(), e);
- }
- }
-
- @Override
- public void ack(Object msgId) {
- if (msgId instanceof Message) {
- Message<?> msg = (Message<?>) msgId;
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Received ack for message {}", spoutId, msg.getMessageId());
- }
- consumer.acknowledgeAsync(msg);
- pendingMessageRetries.remove(msg.getMessageId());
- // we should also remove message from failedMessages but it will be eventually removed while emitting next
- // tuple
- --pendingAcks;
- }
- }
-
- @Override
- public void fail(Object msgId) {
- if (msgId instanceof Message) {
- @SuppressWarnings("unchecked")
- Message<byte[]> msg = (Message<byte[]>) msgId;
- MessageId id = msg.getMessageId();
- LOG.warn("[{}] Error processing message {}", spoutId, id);
-
- // Since the message processing failed, we put it in the failed messages queue if there are more retries
- // remaining for the message
- MessageRetries messageRetries = pendingMessageRetries.computeIfAbsent(id, (k) -> new MessageRetries());
- if ((failedRetriesTimeoutNano < 0
- || (messageRetries.getTimeStamp() + failedRetriesTimeoutNano) > System.nanoTime())
- && (maxFailedRetries < 0 || messageRetries.numRetries < maxFailedRetries)) {
- // since we can retry again, we increment retry count and put it in the queue
- LOG.info("[{}] Putting message {} in the retry queue", spoutId, id);
- messageRetries.incrementAndGet();
- pendingMessageRetries.putIfAbsent(id, messageRetries);
- failedMessages.add(msg);
- --pendingAcks;
- messagesFailed++;
- } else {
- LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id);
- ack(msg);
- }
- }
-
- }
-
- /**
- * Emits a tuple received from the Pulsar consumer unless there are any failed messages
- */
- @Override
- public void nextTuple() {
- emitNextAvailableTuple();
- }
-
- /**
- * It makes sure that it emits next available non-tuple to topology unless consumer queue doesn't have any message
- * available. It receives message from consumer queue and converts it to tuple and emits to topology. if the
- * converted tuple is null then it tries to receives next message and perform the same until it finds non-tuple to
- * emit.
- */
- public void emitNextAvailableTuple() {
- // check if there are any failed messages to re-emit in the topology
- if(emitFailedMessage()) {
- return;
- }
-
- Message<byte[]> msg;
- // receive from consumer if no failed messages
- if (consumer != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Receiving the next message from pulsar consumer to emit to the collector", spoutId);
- }
- try {
- boolean done = false;
- while (!done) {
- msg = consumer.receive(100, TimeUnit.MILLISECONDS);
- if (msg != null) {
- ++messagesReceived;
- messageSizeReceived += msg.getData().length;
- done = mapToValueAndEmit(msg);
- } else {
- // queue is empty and nothing to emit
- done = true;
- messageNotAvailableCount++;
- }
- }
- } catch (PulsarClientException e) {
- LOG.error("[{}] Error receiving message from pulsar consumer", spoutId, e);
- }
- }
- }
-
- private boolean emitFailedMessage() {
- Message<byte[]> msg;
-
- while ((msg = failedMessages.peek()) != null) {
- MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
- if (messageRetries != null) {
- // emit the tuple if retry doesn't need backoff else sleep with backoff time and return without doing
- // anything
- if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
- messageRetries.getNumRetries(), clientConf.getInitialBackoffIntervalNanos(),
- clientConf.getMaxBackoffIntervalNanos())) {
- Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getInitialBackoffIntervalNanos()));
- } else {
- // remove the message from the queue and emit to the topology, only if it should not be backedoff
- LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());
- failedMessages.remove();
- mapToValueAndEmit(msg);
- }
- return true;
- }
-
- // messageRetries is null because messageRetries is already acked and removed from pendingMessageRetries
- // then remove it from failed message queue as well.
- if(LOG.isDebugEnabled()) {
- LOG.debug("[{}]-{} removing {} from failedMessage because it's already acked",
- pulsarSpoutConf.getTopic(), spoutId, msg.getMessageId());
- }
- failedMessages.remove();
- // try to find out next failed message
- continue;
- }
- return false;
- }
-
- @Override
- @SuppressWarnings({ "rawtypes" })
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.componentId = context.getThisComponentId();
- this.spoutId = String.format("%s-%s", componentId, context.getThisTaskId());
- this.collector = collector;
- pendingMessageRetries.clear();
- failedMessages.clear();
- try {
- consumer = createConsumer();
- LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId,
- pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName());
- } catch (PulsarClientException e) {
- LOG.error("[{}] Error creating pulsar consumer on topic {}", spoutId, pulsarSpoutConf.getTopic(), e);
- throw new IllegalStateException(format("Failed to initialize consumer for %s-%s : %s",
- pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName(), e.getMessage()), e);
- }
- context.registerMetric(String.format("PulsarSpoutMetrics-%s-%s", componentId, context.getThisTaskIndex()), this,
- pulsarSpoutConf.getMetricsTimeIntervalInSecs());
- }
-
- private PulsarSpoutConsumer createConsumer() throws PulsarClientException {
- sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
- PulsarSpoutConsumer consumer;
- if (pulsarSpoutConf.isSharedConsumerEnabled()) {
- consumer = pulsarSpoutConf.isDurableSubscription()
- ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration()))
- : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration()));
- } else {
- try {
- consumer = pulsarSpoutConf.isDurableSubscription()
- ? new SpoutConsumer(sharedPulsarClient.getClient()
- .subscribeAsync(newConsumerConfiguration()).join())
- : new SpoutReader(sharedPulsarClient.getClient()
- .createReaderAsync(newReaderConfiguration()).join());
- } catch (CompletionException e) {
- throw (PulsarClientException) e.getCause();
- }
- }
- return consumer;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer);
-
- }
-
- private boolean mapToValueAndEmit(Message<byte[]> msg) {
- if (msg != null) {
- Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
- ++pendingAcks;
- if (values == null) {
- // since the mapper returned null, we can drop the message and ack it immediately
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Dropping message {}", spoutId, msg.getMessageId());
- }
- ack(msg);
- } else {
- if (values instanceof PulsarTuple) {
- collector.emit(((PulsarTuple) values).getOutputStream(), values, msg);
- } else {
- collector.emit(values, msg);
- }
- ++messagesEmitted;
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Emitted message {} to the collector", spoutId, msg.getMessageId());
- }
- return true;
- }
- }
- return false;
- }
-
- public class MessageRetries {
- private final long timestampInNano;
- private int numRetries;
-
- public MessageRetries() {
- this.timestampInNano = System.nanoTime();
- this.numRetries = 0;
- }
-
- public long getTimeStamp() {
- return timestampInNano;
- }
-
- public int incrementAndGet() {
- return ++numRetries;
- }
-
- public int getNumRetries() {
- return numRetries;
- }
- }
-
- /**
- * Helpers for metrics
- */
-
- @SuppressWarnings({ "rawtypes" })
- ConcurrentMap getMetrics() {
- metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long) pendingMessageRetries.size());
- metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived);
- metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted);
- metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed);
- metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount);
- metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks);
- metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
- metricsMap.put(CONSUMER_THROUGHPUT_BYTES,
- ((double) messageSizeReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
- return metricsMap;
- }
-
- void resetMetrics() {
- messagesReceived = 0;
- messagesEmitted = 0;
- messageSizeReceived = 0;
- messagesFailed = 0;
- messageNotAvailableCount = 0;
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public Object getValueAndReset() {
- ConcurrentMap metrics = getMetrics();
- resetMetrics();
- return metrics;
- }
-
- private ReaderConfigurationData<byte[]> newReaderConfiguration() {
- ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<>();
- readerConf.setTopicName(pulsarSpoutConf.getTopic());
- readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName());
- readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition());
- if (this.consumerConf != null) {
- readerConf.setCryptoFailureAction(consumerConf.getCryptoFailureAction());
- readerConf.setCryptoKeyReader(consumerConf.getCryptoKeyReader());
- readerConf.setReadCompacted(consumerConf.isReadCompacted());
- readerConf.setReceiverQueueSize(consumerConf.getReceiverQueueSize());
- }
- return readerConf;
- }
-
- private ConsumerConfigurationData<byte[]> newConsumerConfiguration() {
- ConsumerConfigurationData<byte[]> consumerConf = this.consumerConf != null ? this.consumerConf
- : new ConsumerConfigurationData<>();
- consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
- consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
- consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
- return consumerConf;
- }
-
- static class SpoutConsumer implements PulsarSpoutConsumer {
- private Consumer<byte[]> consumer;
-
- public SpoutConsumer(Consumer<byte[]> consumer) {
- super();
- this.consumer = consumer;
- }
-
- @Override
- public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
- return consumer.receive(timeout, unit);
- }
-
- @Override
- public void acknowledgeAsync(Message<?> msg) {
- consumer.acknowledgeAsync(msg);
- }
-
- @Override
- public void close() throws PulsarClientException {
- consumer.close();
- }
-
- @Override
- public void unsubscribe() throws PulsarClientException {
- consumer.unsubscribe();
- }
-
- }
-
- static class SpoutReader implements PulsarSpoutConsumer {
- private Reader<byte[]> reader;
-
- public SpoutReader(Reader<byte[]> reader) {
- super();
- this.reader = reader;
- }
-
- @Override
- public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
- return reader.readNext(timeout, unit);
- }
-
- @Override
- public void acknowledgeAsync(Message<?> msg) {
- // No-op
- }
-
- @Override
- public void close() throws PulsarClientException {
- try {
- reader.close();
- } catch (IOException e) {
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- public void unsubscribe() throws PulsarClientException {
- // No-op
- }
- }
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
deleted file mode 100644
index db797ee..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/**
- * Class used to specify pulsar spout configuration
- *
- *
- */
-public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public static final long DEFAULT_FAILED_RETRIES_TIMEOUT_NANO = TimeUnit.SECONDS.toNanos(60);
- public static final int DEFAULT_MAX_FAILED_RETRIES = -1;
-
- private String subscriptionName = null;
- private MessageToValuesMapper messageToValuesMapper = null;
- private long failedRetriesTimeoutNano = DEFAULT_FAILED_RETRIES_TIMEOUT_NANO;
- private int maxFailedRetries = DEFAULT_MAX_FAILED_RETRIES;
- private boolean sharedConsumerEnabled = false;
-
- private SubscriptionType subscriptionType = SubscriptionType.Shared;
- private boolean autoUnsubscribe = false;
- private boolean durableSubscription = true;
- // read position if non-durable subscription is enabled : default oldest message available in topic
- private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest;
-
-
- /**
- * @return the subscription name for the consumer in the spout
- */
- public String getSubscriptionName() {
- return subscriptionName;
- }
-
- /**
- * Sets the subscription name for the consumer in the spout
- *
- * @param subscriptionName
- */
- public void setSubscriptionName(String subscriptionName) {
- this.subscriptionName = subscriptionName;
- }
-
- public SubscriptionType getSubscriptionType() {
- return subscriptionType;
- }
-
- public void setSubscriptionType(SubscriptionType subscriptionType) {
- this.subscriptionType = subscriptionType;
- }
-
- /**
- * @return the mapper to convert pulsar message to a storm tuple
- */
- public MessageToValuesMapper getMessageToValuesMapper() {
- return messageToValuesMapper;
- }
-
- /**
- * Sets the mapper to convert pulsar message to a storm tuple.
- * <p>
- * Note: If the mapper returns null, the message is not emitted to the collector and is acked immediately
- * </p>
- *
- * @param mapper
- */
- public void setMessageToValuesMapper(MessageToValuesMapper mapper) {
- this.messageToValuesMapper = Objects.requireNonNull(mapper);
- }
-
- /**
- *
- * @param unit
- * @return the timeout for retrying failed messages
- */
- public long getFailedRetriesTimeout(TimeUnit unit) {
- return unit.convert(failedRetriesTimeoutNano, TimeUnit.NANOSECONDS);
- }
-
- /**
- * Sets the timeout within which the spout will re-inject failed messages with an exponential backoff <i>(default:
- * 60 seconds)</i> Note: If set to 0, the message will not be retried when failed. If set to < 0, the message will
- * be retried forever till it is successfully processed or max message retry count is reached, whichever comes
- * first.
- *
- * @param failedRetriesTimeout
- * @param unit
- */
- public void setFailedRetriesTimeout(long failedRetriesTimeout, TimeUnit unit) {
- this.failedRetriesTimeoutNano = unit.toNanos(failedRetriesTimeout);
- }
-
- /**
- *
- * @return the maximum number of times a failed message will be retried
- */
- public int getMaxFailedRetries() {
- return maxFailedRetries;
- }
-
- /**
- * Sets the maximum number of times the spout will re-inject failed messages with an exponential backoff
- * <i>(default: -1)</i> Note: If set to 0, the message will not be retried when failed. If set to < 0, the message
- * will be retried forever till it is successfully processed or configured timeout expires, whichever comes first.
- *
- * @param maxFailedRetries
- */
- public void setMaxFailedRetries(int maxFailedRetries) {
- this.maxFailedRetries = maxFailedRetries;
- }
-
- /**
- *
- * @return if the consumer is shared across different executors of a spout
- */
- public boolean isSharedConsumerEnabled() {
- return sharedConsumerEnabled;
- }
-
- /**
- * Sets whether the consumer will be shared across different executors of a spout. <i>(default: false)</i>
- *
- * @param sharedConsumerEnabled
- */
- public void setSharedConsumerEnabled(boolean sharedConsumerEnabled) {
- this.sharedConsumerEnabled = sharedConsumerEnabled;
- }
-
- public boolean isAutoUnsubscribe() {
- return autoUnsubscribe;
- }
-
- /**
- * It unsubscribes the subscription when spout gets closed in the topology.
- *
- * @param autoUnsubscribe
- */
- public void setAutoUnsubscribe(boolean autoUnsubscribe) {
- this.autoUnsubscribe = autoUnsubscribe;
- }
-
- public boolean isDurableSubscription() {
- return durableSubscription;
- }
-
- /**
- * if subscription is not durable then it creates non-durable reader to start reading from the
- * {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in topic.
- *
- * @param durableSubscription
- */
- public void setDurableSubscription(boolean durableSubscription) {
- this.durableSubscription = durableSubscription;
- }
-
- public MessageId getNonDurableSubscriptionReadPosition() {
- return nonDurableSubscriptionReadPosition;
- }
-
- /**
- * Non-durable-subscription/Reader can be set to start reading from a specific position earliest/latest.
- *
- * @param nonDurableSubscriptionReadPosition
- */
- public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) {
- this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition;
- }
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
deleted file mode 100644
index 5502a62..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-public interface PulsarSpoutConsumer {
-
- /**
- * Receives a single message.
- *
- * @param waitTime
- * @param unit
- * @return
- * @throws PulsarClientException
- */
- Message<byte[]> receive(int waitTime, TimeUnit unit) throws PulsarClientException;
-
- /**
- * Ack the message async.
- *
- * @param msg
- */
- void acknowledgeAsync(Message<?> msg);
-
- /**
- * unsubscribe the consumer
- * @throws PulsarClientException
- */
- void unsubscribe() throws PulsarClientException;
-
- /**
- * Close the consumer
- *
- * @throws PulsarClientException
- */
- void close() throws PulsarClientException;
-
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java
deleted file mode 100644
index 3f0597b..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.io.Serializable;
-
-/**
- * Class used to specify pulsar storm configurations like service url and topic
- *
- *
- */
-public class PulsarStormConfiguration implements Serializable {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public static final int DEFAULT_METRICS_TIME_INTERVAL_IN_SECS = 60;
-
- private String serviceUrl = null;
- private String topic = null;
- private int metricsTimeIntervalInSecs = DEFAULT_METRICS_TIME_INTERVAL_IN_SECS;
-
- /**
- * @return the service URL to connect to from the client
- */
- public String getServiceUrl() {
- return serviceUrl;
- }
-
- /**
- * Sets the service URL to connect to from the client
- *
- * @param serviceUrl
- */
- public void setServiceUrl(String serviceUrl) {
- this.serviceUrl = serviceUrl;
- }
-
- /**
- * @return the topic name for the producer/consumer
- */
- public String getTopic() {
- return topic;
- }
-
- /**
- * Sets the topic name for the producer/consumer. It should be of the format
- * {persistent|non-persistent}://{property}/{cluster}/{namespace}/{topic}
- *
- * @param topic
- */
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- /**
- * @return the time interval in seconds for metrics generation
- */
- public int getMetricsTimeIntervalInSecs() {
- return metricsTimeIntervalInSecs;
- }
-
- /**
- * Sets the time interval in seconds for metrics generation <i>(default: 60 seconds)</i>
- *
- * @param metricsTimeIntervalInSecs
- */
- public void setMetricsTimeIntervalInSecs(int metricsTimeIntervalInSecs) {
- this.metricsTimeIntervalInSecs = metricsTimeIntervalInSecs;
- }
-
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
deleted file mode 100644
index b000827..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-
-import org.apache.storm.tuple.Values;
-
-/**
- * Returned by MessageToValuesMapper, this specifies the Values
- * for an output tuple and the stream it should be sent to.
- */
-public class PulsarTuple extends Values {
-
- protected final String outputStream;
-
- public PulsarTuple(String outStream, Object ... values) {
- super(values);
- outputStream = outStream;
- }
-
- /**
- * Return stream the tuple should be emitted on.
- *
- * @return String
- */
- public String getOutputStream() {
- return outputStream;
- }
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
deleted file mode 100644
index f4950c6..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SharedPulsarClient {
- private static final Logger LOG = LoggerFactory.getLogger(SharedPulsarClient.class);
- private static final ConcurrentMap<String, SharedPulsarClient> instances = new ConcurrentHashMap<>();
-
- private final String componentId;
- private final PulsarClientImpl client;
- private final AtomicInteger counter = new AtomicInteger();
-
- private Consumer<byte[]> consumer;
- private Reader<byte[]> reader;
- private Producer<byte[]> producer;
-
- private SharedPulsarClient(String componentId, ClientConfigurationData clientConf)
- throws PulsarClientException {
- this.client = new PulsarClientImpl(clientConf);
- this.componentId = componentId;
- }
-
- /**
- * Provides a shared pulsar client that is shared across all different tasks in the same component. Different
- * components will not share the pulsar client since they can have different configurations.
- *
- * @param componentId
- * the id of the spout/bolt
- * @param clientConf
- * @return
- * @throws PulsarClientException
- */
- public static SharedPulsarClient get(String componentId, ClientConfigurationData clientConf)
- throws PulsarClientException {
- AtomicReference<PulsarClientException> exception = new AtomicReference<PulsarClientException>();
- instances.computeIfAbsent(componentId, pulsarClient -> {
- SharedPulsarClient sharedPulsarClient = null;
- try {
- sharedPulsarClient = new SharedPulsarClient(componentId, clientConf);
- LOG.info("[{}] Created a new Pulsar Client.", componentId);
- } catch (PulsarClientException e) {
- exception.set(e);
- }
- return sharedPulsarClient;
- });
- if (exception.get() != null) {
- throw exception.get();
- }
- return instances.get(componentId);
- }
-
- public PulsarClientImpl getClient() {
- counter.incrementAndGet();
- return client;
- }
-
- public Consumer<byte[]> getSharedConsumer(ConsumerConfigurationData<byte[]> consumerConf)
- throws PulsarClientException {
- counter.incrementAndGet();
- synchronized (this) {
- if (consumer == null) {
- try {
- consumer = client.subscribeAsync(consumerConf).join();
- } catch (CompletionException e) {
- throw (PulsarClientException) e.getCause();
- }
- LOG.info("[{}] Created a new Pulsar Consumer on {}", componentId, consumerConf.getSingleTopic());
- } else {
- LOG.info("[{}] Using a shared consumer on {}", componentId, consumerConf.getSingleTopic());
- }
- }
- return consumer;
- }
-
- public Reader<byte[]> getSharedReader(ReaderConfigurationData<byte[]> readerConf) throws PulsarClientException {
- counter.incrementAndGet();
- synchronized (this) {
- if (reader == null) {
- try {
- reader = client.createReaderAsync(readerConf).join();
- } catch (CompletionException e) {
- throw (PulsarClientException) e.getCause();
- }
- LOG.info("[{}] Created a new Pulsar reader on {}", componentId, readerConf.getTopicName());
- } else {
- LOG.info("[{}] Using a shared reader on {}", componentId, readerConf.getTopicName());
- }
- }
- return reader;
- }
-
- public Producer<byte[]> getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException {
- counter.incrementAndGet();
- synchronized (this) {
- if (producer == null) {
- try {
- producer = client.createProducerAsync(producerConf).join();
- } catch (CompletionException e) {
- throw (PulsarClientException) e.getCause();
- }
- LOG.info("[{}] Created a new Pulsar Producer on {}", componentId, producerConf.getTopicName());
- } else {
- LOG.info("[{}] Using a shared producer on {}", componentId, producerConf.getTopicName());
- }
- }
- return producer;
- }
-
- public void close() throws PulsarClientException {
- if (counter.decrementAndGet() <= 0) {
- if (client != null) {
- client.close();
- instances.remove(componentId);
- LOG.info("[{}] Closed Pulsar Client", componentId);
- }
- }
- }
-
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
deleted file mode 100644
index 452e0ce..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.io.Serializable;
-
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-
-public interface TupleToMessageMapper extends Serializable {
-
- /**
- * Convert tuple to {@link org.apache.pulsar.client.api.Message}.
- *
- * @param tuple
- * @return
- * @deprecated use {@link #toMessage(TypedMessageBuilder, Tuple)}
- */
- @Deprecated
- default Message<byte[]> toMessage(Tuple tuple) {
- return null;
- }
-
- /**
- * Set the value on a message builder to prepare the message to be published from the Bolt.
- *
- * @param tuple
- * @return
- */
- default TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
- // Default implementation provided for backward compatibility
- Message<byte[]> msg = toMessage(tuple);
- msgBuilder.value(msg.getData())
- .properties(msg.getProperties());
- if (msg.hasKey()) {
- msgBuilder.key(msg.getKey());
- }
- return msgBuilder;
- }
-
-
- /**
- * Declare the output schema for the bolt.
- *
- * @param declarer
- */
- public void declareOutputFields(OutputFieldsDeclarer declarer);
-}
diff --git a/pulsar-storm/src/main/javadoc/overview.html b/pulsar-storm/src/main/javadoc/overview.html
deleted file mode 100644
index a1595eb..0000000
--- a/pulsar-storm/src/main/javadoc/overview.html
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
-<HTML>
- <HEAD>
- <TITLE>Pulsar Storm API Overview</TITLE>
- </HEAD>
- <BODY>
- The Pulsar Storm API is a proprietary messaging API.
- </BODY>
-</HTML>
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
deleted file mode 100644
index e6cbc51..0000000
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-import org.mockito.ArgumentCaptor;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.Maps;
-
-public class PulsarSpoutTest {
-
- @Test
- public void testAckFailedMessage() throws Exception {
-
- PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
- conf.setServiceUrl("http://localhost:8080");
- conf.setSubscriptionName("sub1");
- conf.setTopic("persistent://prop/ns1/topic1");
- conf.setSubscriptionType(SubscriptionType.Exclusive);
- conf.setMessageToValuesMapper(new MessageToValuesMapper() {
- @Override
- public Values toValues(Message<byte[]> msg) {
- return null;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- });
-
- ClientBuilder builder = spy(new ClientBuilderImpl());
- PulsarSpout spout = spy(new PulsarSpout(conf, builder));
-
- Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(),
- new byte[0], Schema.BYTES, new MessageMetadata());
- Consumer<byte[]> consumer = mock(Consumer.class);
- SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
- CompletableFuture<Void> future = new CompletableFuture<>();
- future.complete(null);
- doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId());
- Field consField = PulsarSpout.class.getDeclaredField("consumer");
- consField.setAccessible(true);
- consField.set(spout, spoutConsumer);
-
- spout.fail(msg);
- spout.ack(msg);
- spout.emitNextAvailableTuple();
- verify(consumer, atLeast(1)).receive(anyInt(), any());
- }
-
- @Test
- public void testPulsarTuple() throws Exception {
- testPulsarSpout(true);
- }
-
- @Test
- public void testPulsarSpout() throws Exception {
- testPulsarSpout(false);
- }
-
- public void testPulsarSpout(boolean pulsarTuple) throws Exception {
- PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
- conf.setServiceUrl("http://localhost:8080");
- conf.setSubscriptionName("sub1");
- conf.setTopic("persistent://prop/ns1/topic1");
- conf.setSubscriptionType(SubscriptionType.Exclusive);
- conf.setSharedConsumerEnabled(true);
- AtomicBoolean called = new AtomicBoolean(false);
- conf.setMessageToValuesMapper(new MessageToValuesMapper() {
- @Override
- public Values toValues(Message<byte[]> msg) {
- called.set(true);
- if ("message to be dropped".equals(new String(msg.getData()))) {
- return null;
- }
- String val = new String(msg.getData());
- if (val.startsWith("stream:")) {
- String stream = val.split(":")[1];
- return new PulsarTuple(stream, val);
- }
- return new Values(val);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- });
-
- String msgContent = pulsarTuple ? "stream:pstream" : "test";
-
- ClientBuilder builder = spy(new ClientBuilderImpl());
- PulsarSpout spout = spy(new PulsarSpout(conf, builder));
- TopologyContext context = mock(TopologyContext.class);
- final String componentId = "test-component-id";
- doReturn(componentId).when(context).getThisComponentId();
- SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
- Map config = new HashMap<>();
- Field field = SharedPulsarClient.class.getDeclaredField("instances");
- field.setAccessible(true);
- ConcurrentMap<String, SharedPulsarClient> instances = (ConcurrentMap<String, SharedPulsarClient>) field
- .get(SharedPulsarClient.class);
-
- SharedPulsarClient client = mock(SharedPulsarClient.class);
- Consumer<byte[]> consumer = mock(Consumer.class);
- when(client.getSharedConsumer(any())).thenReturn(consumer);
- instances.put(componentId, client);
-
- Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(),
- msgContent.getBytes(), Schema.BYTES, new MessageMetadata());
- when(consumer.receive(anyInt(), any())).thenReturn(msg);
-
- spout.open(config, context, collector);
- spout.emitNextAvailableTuple();
-
- assertTrue(called.get());
- verify(consumer, atLeast(1)).receive(anyInt(), any());
- ArgumentCaptor<Values> capt = ArgumentCaptor.forClass(Values.class);
- if (pulsarTuple) {
- verify(collector, times(1)).emit(eq("pstream"), capt.capture(), eq(msg));
- } else {
- verify(collector, times(1)).emit(capt.capture(), eq(msg));
- }
- Values vals = capt.getValue();
- assertEquals(msgContent, vals.get(0));
- }
-
-}
diff --git a/tests/pom.xml b/tests/pom.xml
index 91b3416..188a296 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -33,7 +33,6 @@
<name>Apache Pulsar Adapters :: Tests</name>
<modules>
<module>pulsar-kafka-compat-client-test</module>
- <module>pulsar-storm-test</module>
<module>pulsar-spark-test</module>
</modules>
<build>
diff --git a/tests/pulsar-storm-test/pom.xml b/tests/pulsar-storm-test/pom.xml
deleted file mode 100644
index 3134328..0000000
--- a/tests/pulsar-storm-test/pom.xml
+++ /dev/null
@@ -1,131 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.pulsar.tests</groupId>
- <artifactId>adapters-tests-parent</artifactId>
- <version>2.11.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-storm-test</artifactId>
- <packaging>jar</packaging>
- <name>Pulsar Storm adapter Tests</name>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-storm</artifactId>
- <version>2.11.0-SNAPSHOT</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>buildtools</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-broker</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-broker</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>testmocks</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.asynchttpclient</groupId>
- <artifactId>async-http-client</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- </dependencies>
-</project>
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
deleted file mode 100644
index 4355ad6..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.storm.task.IOutputCollector;
-import org.apache.storm.tuple.Tuple;
-
-public class MockOutputCollector implements IOutputCollector {
-
- private boolean acked = false;
- private boolean failed = false;
- private Throwable lastError = null;
- private Tuple ackedTuple = null;
- private int numTuplesAcked = 0;
-
- @Override
- public void reportError(Throwable error) {
- lastError = error;
- }
-
- @Override
- public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- return null;
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- }
-
- @Override
- public void ack(Tuple input) {
- acked = true;
- failed = false;
- ackedTuple = input;
- ++numTuplesAcked;
- }
-
- @Override
- public void fail(Tuple input) {
- failed = true;
- acked = false;
- }
-
- @Override
- public void resetTimeout(Tuple tuple) {
-
- }
-
- public boolean acked() {
- return acked;
- }
-
- public boolean failed() {
- return failed;
- }
-
- public Throwable getLastError() {
- return lastError;
- }
-
- public Tuple getAckedTuple() {
- return ackedTuple;
- }
-
- public int getNumTuplesAcked() {
- return numTuplesAcked;
- }
-
- public void reset() {
- acked = false;
- failed = false;
- lastError = null;
- ackedTuple = null;
- numTuplesAcked = 0;
- }
-
- @Override
- public void flush() {
- // Nothing to flush from buffer
- }
-
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
deleted file mode 100644
index 98c8d20..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.pulsar.client.api.Message;
-
-import org.apache.storm.spout.ISpoutOutputCollector;
-
-public class MockSpoutOutputCollector implements ISpoutOutputCollector {
-
- private boolean emitted = false;
- private Message lastMessage = null;
- private String data = null;
-
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
- emitted = true;
- data = (String) tuple.get(0);
- lastMessage = (Message) messageId;
- return new ArrayList<Integer>();
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
- emitted = true;
- data = (String) tuple.get(0);
- lastMessage = (Message) messageId;
- }
-
- @Override
- public long getPendingCount() {
- return 0;
- }
-
- @Override
- public void reportError(Throwable error) {
- }
-
- public boolean emitted() {
- return emitted;
- }
-
- public String getTupleData() {
- return data;
- }
-
- public Message getLastMessage() {
- return lastMessage;
- }
-
- public void reset() {
- emitted = false;
- data = null;
- lastMessage = null;
- }
-
- @Override
- public void flush() {
- // Nothing to flush from buffer
- }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
deleted file mode 100644
index b90e855..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.fail;
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-public class PulsarBoltTest extends ProducerConsumerBase {
-
- private static final int NO_OF_RETRIES = 10;
-
- public String serviceUrl;
- public final String topic = "persistent://my-property/my-ns/my-topic1";
- public final String subscriptionName = "my-subscriber-name";
-
- protected PulsarBoltConfiguration pulsarBoltConf;
- protected PulsarBolt bolt;
- protected MockOutputCollector mockCollector;
- protected Consumer consumer;
-
- @Override
- @BeforeMethod
- public void beforeMethod(Method m) throws Exception {
- super.beforeMethod(m);
- setup();
- }
-
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
-
- serviceUrl = pulsar.getWebServiceAddress();
-
- pulsarBoltConf = new PulsarBoltConfiguration();
- pulsarBoltConf.setServiceUrl(serviceUrl);
- pulsarBoltConf.setTopic(topic);
- pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
- pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
- bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
- mockCollector = new MockOutputCollector();
- OutputCollector collector = new OutputCollector(mockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName);
- when(context.getThisTaskId()).thenReturn(0);
- bolt.prepare(Maps.newHashMap(), context, collector);
- consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName).subscribe();
- }
-
- @AfterMethod
- public void cleanup() throws Exception {
- bolt.close();
- consumer.close();
- super.internalCleanup();
- }
-
- @SuppressWarnings("serial")
- static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
-
- @Override
- public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
- if ("message to be dropped".equals(new String(tuple.getBinary(0)))) {
- return null;
- }
- if ("throw exception".equals(new String(tuple.getBinary(0)))) {
- throw new RuntimeException();
- }
- return msgBuilder.value(tuple.getBinary(0));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
- };
-
- private Tuple getMockTuple(String msgContent) {
- Tuple mockTuple = mock(Tuple.class);
- when(mockTuple.getBinary(0)).thenReturn(msgContent.getBytes());
- when(mockTuple.getSourceComponent()).thenReturn("");
- when(mockTuple.getSourceStreamId()).thenReturn("");
- return mockTuple;
- }
-
- @Test
- public void testBasic() throws Exception {
- String msgContent = "hello world";
- Tuple tuple = getMockTuple(msgContent);
- bolt.execute(tuple);
- for (int i = 0; i < NO_OF_RETRIES; i++) {
- Thread.sleep(1000);
- if (mockCollector.acked()) {
- break;
- }
- }
- Assert.assertTrue(mockCollector.acked());
- Assert.assertFalse(mockCollector.failed());
- Assert.assertNull(mockCollector.getLastError());
- Assert.assertEquals(tuple, mockCollector.getAckedTuple());
- Message msg = consumer.receive(5, TimeUnit.SECONDS);
- consumer.acknowledge(msg);
- Assert.assertEquals(msgContent, new String(msg.getData()));
- }
-
- @Test
- public void testExecuteFailure() throws Exception {
- String msgContent = "throw exception";
- Tuple tuple = getMockTuple(msgContent);
- bolt.execute(tuple);
- Assert.assertFalse(mockCollector.acked());
- Assert.assertTrue(mockCollector.failed());
- Assert.assertNotNull(mockCollector.getLastError());
- }
-
- @Test
- public void testNoMessageSend() throws Exception {
- String msgContent = "message to be dropped";
- Tuple tuple = getMockTuple(msgContent);
- bolt.execute(tuple);
- Assert.assertTrue(mockCollector.acked());
- Message msg = consumer.receive(5, TimeUnit.SECONDS);
- Assert.assertNull(msg);
- }
-
- @Test
- public void testMetrics() throws Exception {
- bolt.resetMetrics();
- String msgContent = "hello world";
- Tuple tuple = getMockTuple(msgContent);
- for (int i = 0; i < 10; i++) {
- bolt.execute(tuple);
- }
- for (int i = 0; i < NO_OF_RETRIES; i++) {
- Thread.sleep(1000);
- if (mockCollector.getNumTuplesAcked() == 10) {
- break;
- }
- }
- @SuppressWarnings("rawtypes")
- Map metrics = (Map) bolt.getValueAndReset();
- Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 10);
- Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_RATE)).doubleValue(),
- 10.0 / pulsarBoltConf.getMetricsTimeIntervalInSecs());
- Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_THROUGHPUT_BYTES)).doubleValue(),
- ((double) msgContent.getBytes().length * 10) / pulsarBoltConf.getMetricsTimeIntervalInSecs());
- metrics = bolt.getMetrics();
- Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 0);
- for (int i = 0; i < 10; i++) {
- Message msg = consumer.receive(5, TimeUnit.SECONDS);
- consumer.acknowledge(msg);
- }
- }
-
- @Test
- public void testSharedProducer() throws Exception {
- TopicStats topicStats = admin.topics().getStats(topic);
- Assert.assertEquals(topicStats.getPublishers().size(), 1);
- PulsarBolt otherBolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
- MockOutputCollector otherMockCollector = new MockOutputCollector();
- OutputCollector collector = new OutputCollector(otherMockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName);
- when(context.getThisTaskId()).thenReturn(1);
- otherBolt.prepare(Maps.newHashMap(), context, collector);
-
- topicStats = admin.topics().getStats(topic);
- Assert.assertEquals(topicStats.getPublishers().size(), 1);
-
- otherBolt.close();
-
- topicStats = admin.topics().getStats(topic);
- Assert.assertEquals(topicStats.getPublishers().size(), 1);
- }
-
- @Test
- public void testSerializability() throws Exception {
- // test serializability with no auth
- PulsarBolt boltWithNoAuth = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
- TestUtil.testSerializability(boltWithNoAuth);
- }
-
- @Test
- public void testFailedProducer() {
- PulsarBoltConfiguration pulsarBoltConf = new PulsarBoltConfiguration();
- pulsarBoltConf.setServiceUrl(serviceUrl);
- pulsarBoltConf.setTopic("persistent://invalid");
- pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
- pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
- PulsarBolt bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
- MockOutputCollector mockCollector = new MockOutputCollector();
- OutputCollector collector = new OutputCollector(mockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("new" + methodName);
- when(context.getThisTaskId()).thenReturn(0);
- try {
- bolt.prepare(Maps.newHashMap(), context, collector);
- fail("should have failed as producer creation failed");
- } catch (IllegalStateException ie) {
- // Ok.
- }
- }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
deleted file mode 100644
index 322e41b..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-public class PulsarSpoutTest extends ProducerConsumerBase {
-
- public String serviceUrl;
- public final String topic = "persistent://my-property/my-ns/my-topic1";
- public final String subscriptionName = "my-subscriber-name";
-
- protected PulsarSpoutConfiguration pulsarSpoutConf;
- protected PulsarSpout spout;
- protected MockSpoutOutputCollector mockCollector;
- protected Producer producer;
-
- @Override
- @BeforeMethod
- public void beforeMethod(Method m) throws Exception {
- super.beforeMethod(m);
- setup();
- }
-
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
-
- serviceUrl = pulsar.getWebServiceAddress();
-
- pulsarSpoutConf = new PulsarSpoutConfiguration();
- pulsarSpoutConf.setServiceUrl(serviceUrl);
- pulsarSpoutConf.setTopic(topic);
- pulsarSpoutConf.setSubscriptionName(subscriptionName);
- pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
- pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
- pulsarSpoutConf.setMaxFailedRetries(2);
- pulsarSpoutConf.setSharedConsumerEnabled(true);
- pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
- pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
- spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
- mockCollector = new MockSpoutOutputCollector();
- SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
- when(context.getThisTaskId()).thenReturn(0);
- spout.open(Maps.newHashMap(), context, collector);
- producer = pulsarClient.newProducer().topic(topic).create();
- }
-
- @AfterMethod
- public void cleanup() throws Exception {
- producer.close();
- spout.close();
- super.internalCleanup();
- }
-
- @SuppressWarnings("serial")
- public static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
-
- @Override
- public Values toValues(Message msg) {
- if ("message to be dropped".equals(new String(msg.getData()))) {
- return null;
- }
- return new Values(new String(msg.getData()));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
- };
-
- @Test
- public void testBasic() throws Exception {
- String msgContent = "hello world";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.ack(mockCollector.getLastMessage());
- }
-
- @Test
- public void testRedeliverOnFail() throws Exception {
- String msgContent = "hello world";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- spout.fail(mockCollector.getLastMessage());
- mockCollector.reset();
- Thread.sleep(150);
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.ack(mockCollector.getLastMessage());
- }
-
- @Test
- public void testNoRedeliverOnAck() throws Exception {
- String msgContent = "hello world";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- spout.ack(mockCollector.getLastMessage());
- mockCollector.reset();
- spout.nextTuple();
- assertFalse(mockCollector.emitted());
- assertNull(mockCollector.getTupleData());
- }
-
- @Test
- public void testLimitedRedeliveriesOnTimeout() throws Exception {
- String msgContent = "chuck norris";
- producer.send(msgContent.getBytes());
-
- long startTime = System.currentTimeMillis();
- while (startTime + pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.MILLISECONDS) > System
- .currentTimeMillis()) {
- mockCollector.reset();
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.fail(mockCollector.getLastMessage());
- // wait to avoid backoff
- Thread.sleep(500);
- }
- spout.nextTuple();
- spout.fail(mockCollector.getLastMessage());
- mockCollector.reset();
- Thread.sleep(500);
- spout.nextTuple();
- assertFalse(mockCollector.emitted());
- assertNull(mockCollector.getTupleData());
- }
-
- @Test
- public void testLimitedRedeliveriesOnCount() throws Exception {
- String msgContent = "hello world";
- producer.send(msgContent.getBytes());
-
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.fail(mockCollector.getLastMessage());
-
- mockCollector.reset();
- Thread.sleep(150);
-
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.fail(mockCollector.getLastMessage());
-
- mockCollector.reset();
- Thread.sleep(300);
-
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.fail(mockCollector.getLastMessage());
-
- mockCollector.reset();
- Thread.sleep(500);
- spout.nextTuple();
- assertFalse(mockCollector.emitted());
- assertNull(mockCollector.getTupleData());
- }
-
- @Test
- public void testBackoffOnRetry() throws Exception {
- String msgContent = "chuck norris";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- spout.fail(mockCollector.getLastMessage());
- mockCollector.reset();
- // due to backoff we should not get the message again immediately
- spout.nextTuple();
- assertFalse(mockCollector.emitted());
- assertNull(mockCollector.getTupleData());
- Thread.sleep(100);
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.ack(mockCollector.getLastMessage());
- }
-
- @Test
- public void testMessageDrop() throws Exception {
- String msgContent = "message to be dropped";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- assertFalse(mockCollector.emitted());
- assertNull(mockCollector.getTupleData());
- }
-
- @SuppressWarnings({ "rawtypes" })
- @Test
- public void testMetrics() throws Exception {
- spout.resetMetrics();
- String msgContent = "hello world";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- Map metrics = spout.getMetrics();
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
- assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_RATE)).doubleValue(),
- 1.0 / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
- assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_THROUGHPUT_BYTES)).doubleValue(),
- ((double) msgContent.getBytes().length) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
- spout.fail(mockCollector.getLastMessage());
- metrics = spout.getMetrics();
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
- Thread.sleep(150);
- spout.nextTuple();
- metrics = spout.getMetrics();
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
- spout.ack(mockCollector.getLastMessage());
- metrics = (Map) spout.getValueAndReset();
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
- }
-
- @Test
- public void testSharedConsumer() throws Exception {
- TopicStats topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1);
- PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
- MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
- SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
- when(context.getThisTaskId()).thenReturn(1);
- otherSpout.open(Maps.newHashMap(), context, collector);
-
- topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1);
-
- otherSpout.close();
-
- topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1);
- }
-
- @Test
- public void testNoSharedConsumer() throws Exception {
- TopicStats topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1);
- pulsarSpoutConf.setSharedConsumerEnabled(false);
- PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
- MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
- SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
- when(context.getThisTaskId()).thenReturn(1);
- otherSpout.open(Maps.newHashMap(), context, collector);
-
- topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 2);
-
- otherSpout.close();
-
- topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1);
- }
-
- @Test
- public void testSerializability() throws Exception {
- // test serializability with no auth
- PulsarSpout spoutWithNoAuth = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
- TestUtil.testSerializability(spoutWithNoAuth);
- }
-
- @Test
- public void testFailedConsumer() {
- PulsarSpoutConfiguration pulsarSpoutConf = new PulsarSpoutConfiguration();
- pulsarSpoutConf.setServiceUrl(serviceUrl);
- pulsarSpoutConf.setTopic("persistent://invalidTopic");
- pulsarSpoutConf.setSubscriptionName(subscriptionName);
- pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
- pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
- pulsarSpoutConf.setMaxFailedRetries(2);
- pulsarSpoutConf.setSharedConsumerEnabled(false);
- pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
- pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
- PulsarSpout spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
- MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector();
- SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("new-test" + methodName);
- when(context.getThisTaskId()).thenReturn(0);
- try {
- spout.open(Maps.newHashMap(), context, collector);
- fail("should have failed as consumer creation failed");
- } catch (IllegalStateException e) {
- // Ok
- }
- }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
deleted file mode 100644
index a71e088..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-
-import org.testng.Assert;
-
-public class TestUtil {
-
- public static void testSerializability(Object object) throws Exception {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(out);
- oos.writeObject(object);
- oos.close();
- Assert.assertTrue(out.toByteArray().length > 0);
- }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
deleted file mode 100644
index 93404ea..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm.example;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.storm.MessageToValuesMapper;
-import org.apache.pulsar.storm.PulsarBolt;
-import org.apache.pulsar.storm.PulsarBoltConfiguration;
-import org.apache.pulsar.storm.PulsarSpout;
-import org.apache.pulsar.storm.PulsarSpoutConfiguration;
-import org.apache.pulsar.storm.TupleToMessageMapper;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.metric.api.IMetricsConsumer;
-import org.apache.storm.task.IErrorReporter;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StormExample {
- private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class);
- private static final String serviceUrl = "http://broker-pdev.messaging.corp.usw.example.com:8080";
-
- @SuppressWarnings("serial")
- static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
-
- @Override
- public Values toValues(Message msg) {
- return new Values(new String(msg.getData()));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // declare the output fields
- declarer.declare(new Fields("string"));
- }
- };
-
- @SuppressWarnings("serial")
- static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
-
- @Override
- public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
- String receivedMessage = tuple.getString(0);
- // message processing
- String processedMsg = receivedMessage + "-processed";
- return msgBuilder.value(processedMsg.getBytes());
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // declare the output fields
- }
- };
-
- public static void main(String[] args) throws Exception {
- // String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
- // String authParams = "key1:val1,key2:val2";
- // clientConf.setAuthentication(authPluginClassName, authParams);
-
- String topic1 = "persistent://my-property/use/my-ns/my-topic1";
- String topic2 = "persistent://my-property/use/my-ns/my-topic2";
- String subscriptionName1 = "my-subscriber-name1";
- String subscriptionName2 = "my-subscriber-name2";
-
- // create spout
- PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
- spoutConf.setServiceUrl(serviceUrl);
- spoutConf.setTopic(topic1);
- spoutConf.setSubscriptionName(subscriptionName1);
- spoutConf.setMessageToValuesMapper(messageToValuesMapper);
- PulsarSpout spout = new PulsarSpout(spoutConf, PulsarClient.builder());
-
- // create bolt
- PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
- boltConf.setServiceUrl(serviceUrl);
- boltConf.setTopic(topic2);
- boltConf.setTupleToMessageMapper(tupleToMessageMapper);
- PulsarBolt bolt = new PulsarBolt(boltConf, PulsarClient.builder());
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("testSpout", spout);
- builder.setBolt("testBolt", bolt).shuffleGrouping("testSpout");
-
- Config conf = new Config();
- conf.setNumWorkers(2);
- conf.setDebug(true);
- conf.registerMetricsConsumer(PulsarMetricsConsumer.class);
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
-
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
- // create a consumer on topic2 to receive messages from the bolt when the processing is done
- Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic2).subscriptionName(subscriptionName2).subscribe();
- // create a producer on topic1 to send messages that will be received by the spout
- Producer<byte[]> producer = pulsarClient.newProducer().topic(topic1).create();
-
- for (int i = 0; i < 10; i++) {
- String msg = "msg-" + i;
- producer.send(msg.getBytes());
- LOG.info("Message {} sent", msg);
- }
- Message<byte[]> msg = null;
- for (int i = 0; i < 10; i++) {
- msg = consumer.receive(1, TimeUnit.SECONDS);
- LOG.info("Message {} received", new String(msg.getData()));
- }
- cluster.killTopology("test");
- cluster.shutdown();
-
- }
-
- class PulsarMetricsConsumer implements IMetricsConsumer {
-
- @Override
- public void prepare(Map stormConf, Object registrationArgument, TopologyContext context,
- IErrorReporter errorReporter) {
- }
-
- @Override
- public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
- // The collection will contain metrics for all the spouts/bolts that register the metrics in the topology.
- // The name for the Pulsar Spout is "PulsarSpoutMetrics-{componentId}-{taskIndex}" and for the Pulsar Bolt
- // is
- // "PulsarBoltMetrics-{componentId}-{taskIndex}".
- }
-
- @Override
- public void cleanup() {
- }
-
- }
-}