You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ay...@apache.org on 2023/03/29 23:09:40 UTC

[pulsar-adapters] branch master updated: Strom adapter moved to Apache Storm community (#47)

This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
     new ab537c1  Strom adapter moved to Apache Storm community (#47)
ab537c1 is described below

commit ab537c13061b06de2044cd43965de930d762fe8a
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Mar 29 16:09:33 2023 -0700

    Strom adapter moved to Apache Storm community (#47)
---
 .asf.yaml                                          |   3 +-
 README.md                                          |   4 +-
 pom.xml                                            |  19 -
 pulsar-storm/pom.xml                               | 102 -----
 .../apache/pulsar/storm/MessageToValuesMapper.java |  44 --
 .../java/org/apache/pulsar/storm/PulsarBolt.java   | 209 ---------
 .../pulsar/storm/PulsarBoltConfiguration.java      |  57 ---
 .../java/org/apache/pulsar/storm/PulsarSpout.java  | 485 ---------------------
 .../pulsar/storm/PulsarSpoutConfiguration.java     | 195 ---------
 .../apache/pulsar/storm/PulsarSpoutConsumer.java   |  58 ---
 .../pulsar/storm/PulsarStormConfiguration.java     |  90 ----
 .../java/org/apache/pulsar/storm/PulsarTuple.java  |  45 --
 .../apache/pulsar/storm/SharedPulsarClient.java    | 153 -------
 .../apache/pulsar/storm/TupleToMessageMapper.java  |  66 ---
 pulsar-storm/src/main/javadoc/overview.html        |  29 --
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   | 178 --------
 tests/pom.xml                                      |   1 -
 tests/pulsar-storm-test/pom.xml                    | 131 ------
 .../apache/pulsar/storm/MockOutputCollector.java   | 101 -----
 .../pulsar/storm/MockSpoutOutputCollector.java     |  80 ----
 .../org/apache/pulsar/storm/PulsarBoltTest.java    | 236 ----------
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   | 349 ---------------
 .../java/org/apache/pulsar/storm/TestUtil.java     |  35 --
 .../apache/pulsar/storm/example/StormExample.java  | 166 -------
 24 files changed, 4 insertions(+), 2832 deletions(-)

diff --git a/.asf.yaml b/.asf.yaml
index 74589a2..c04e151 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -27,7 +27,6 @@ github:
     - streaming
     - queuing
     - event-streaming
-    - apache-storm
     - apache-spark
     - apache-kafka
   features:
@@ -48,4 +47,4 @@ github:
 notifications:
   commits:      commits@pulsar.apache.org
   issues:       commits@pulsar.apache.org
-  pullrequests: commits@pulsar.apache.org
\ No newline at end of file
+  pullrequests: commits@pulsar.apache.org
diff --git a/README.md b/README.md
index ce62338..33409df 100644
--- a/README.md
+++ b/README.md
@@ -25,6 +25,8 @@ This repository is used for hosting all the adapters maintained and supported by
 
 [Apache Flink adapter](https://github.com/apache/flink-connector-pulsar) is supported and maintained by Apache Flink Community.
 
+[Apache Storm bolt and spout](https://github.com/apache/storm/tree/master/external/storm-pulsar) are supported by Apache Storm Community.
+
 ## Building
 
 In order to build this code you can simply use Maven
@@ -42,5 +44,5 @@ git checkout v2.11.0
 mvn clean install -DskipTests
 ```
 
-This is because this repository depends on test integration artifacts of the relative version on the main 
+This is because this repository depends on test integration artifacts of the relative version on the main
 Apache Pulsar codebase
diff --git a/pom.xml b/pom.xml
index 249585f..3527230 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,6 @@
 
   <properties>
     <pulsar.version>2.11.0</pulsar.version>
-    <storm.version>2.0.0</storm.version>
     <kafka-client.version>2.7.0</kafka-client.version>
     <kafka_0_8.version>0.8.1.1</kafka_0_8.version>
     <avro.version>1.10.2</avro.version>
@@ -139,7 +138,6 @@
   </properties>
 
   <modules>
-    <module>pulsar-storm</module>
     <module>pulsar-spark</module>
     <module>pulsar-client-kafka-compat</module>
     <module>pulsar-log4j2-appender</module>
@@ -251,22 +249,6 @@
         </exclusions>
       </dependency>
 
-      <dependency>
-        <groupId>org.apache.storm</groupId>
-        <artifactId>storm-client</artifactId>
-        <version>${storm.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.storm</groupId>
-        <artifactId>storm-server</artifactId>
-        <version>${storm.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.storm</groupId>
-        <artifactId>storm-core</artifactId>
-        <version>${storm.version}</version>
-      </dependency>
-
       <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.9.2</artifactId>
@@ -1028,4 +1010,3 @@
   </repositories>
 
 </project>
-
diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml
deleted file mode 100644
index a20649b..0000000
--- a/pulsar-storm/pom.xml
+++ /dev/null
@@ -1,102 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
-  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar-adapters</artifactId>
-    <version>2.11.0-SNAPSHOT</version>
-    <relativePath>..</relativePath>
-  </parent>
-
-  <artifactId>pulsar-storm</artifactId>
-  <name>Pulsar Storm adapter</name>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-common</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.testng</groupId>
-      <artifactId>testng</artifactId>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.yaml</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.storm</groupId>
-      <artifactId>storm-client</artifactId>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>ch.qos.logback</groupId>
-          <artifactId>logback-classic</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>log4j-over-slf4j</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <resources>
-      <resource>
-        <directory>src/main/resources</directory>
-        <filtering>true</filtering>
-      </resource>
-    </resources>
-  </build>
-</project>
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
deleted file mode 100644
index 92e127c..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.io.Serializable;
-
-import org.apache.pulsar.client.api.Message;
-
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-
-public interface MessageToValuesMapper extends Serializable {
-
-    /**
-     * Convert {@link org.apache.pulsar.client.api.Message} to tuple values.
-     *
-     * @param msg
-     * @return
-     */
-    Values toValues(Message<byte[]> msg);
-
-    /**
-     * Declare the output schema for the spout.
-     *
-     * @param declarer
-     */
-    void declareOutputFields(OutputFieldsDeclarer declarer);
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
deleted file mode 100644
index d331ca2..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static java.lang.String.format;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class PulsarBolt extends BaseRichBolt implements IMetric {
-    /**
-     *
-     */
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarBolt.class);
-
-    public static final String NO_OF_MESSAGES_SENT = "numberOfMessagesSent";
-    public static final String PRODUCER_RATE = "producerRate";
-    public static final String PRODUCER_THROUGHPUT_BYTES = "producerThroughput";
-
-    private final ClientConfigurationData clientConf;
-    private final ProducerConfigurationData producerConf;
-    private final PulsarBoltConfiguration pulsarBoltConf;
-    private final ConcurrentMap<String, Object> metricsMap = new ConcurrentHashMap<>();
-
-    private SharedPulsarClient sharedPulsarClient;
-    private String componentId;
-    private String boltId;
-    private OutputCollector collector;
-    private Producer<byte[]> producer;
-    private volatile long messagesSent = 0;
-    private volatile long messageSizeSent = 0;
-
-    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf) {
-        this(pulsarBoltConf, PulsarClient.builder());
-    }
-
-    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) {
-        this(pulsarBoltConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(),
-                new ProducerConfigurationData());
-    }
-
-    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfigurationData clientConf,
-            ProducerConfigurationData producerConf) {
-        checkNotNull(pulsarBoltConf, "bolt configuration can't be null");
-        checkNotNull(clientConf, "client configuration can't be null");
-        checkNotNull(producerConf, "producer configuration can't be null");
-        Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
-        Objects.requireNonNull(pulsarBoltConf.getTopic());
-        Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
-        this.pulsarBoltConf = pulsarBoltConf;
-        this.clientConf = clientConf;
-        this.producerConf = producerConf;
-        this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
-        this.producerConf.setTopicName(pulsarBoltConf.getTopic());
-        this.producerConf.setBatcherBuilder(null);
-    }
-    
-    @SuppressWarnings({ "rawtypes" })
-    @Override
-    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-        this.componentId = context.getThisComponentId();
-        this.boltId = String.format("%s-%s", componentId, context.getThisTaskId());
-        this.collector = collector;
-        try {
-            sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
-            producer = sharedPulsarClient.getSharedProducer(producerConf);
-            LOG.info("[{}] Created a pulsar producer on topic {} to send messages", boltId, pulsarBoltConf.getTopic());
-        } catch (PulsarClientException e) {
-            LOG.error("[{}] Error initializing pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e);
-            throw new IllegalStateException(
-                    format("Failed to initialize producer for %s : %s", pulsarBoltConf.getTopic(), e.getMessage()), e);
-        }
-        context.registerMetric(String.format("PulsarBoltMetrics-%s-%s", componentId, context.getThisTaskIndex()), this,
-                pulsarBoltConf.getMetricsTimeIntervalInSecs());
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        if (TupleUtils.isTick(input)) {
-            collector.ack(input);
-            return;
-        }
-        try {
-            if (producer != null) {
-                // a message key can be provided in the mapper
-                TypedMessageBuilder<byte[]> msgBuilder = pulsarBoltConf.getTupleToMessageMapper()
-                        .toMessage(producer.newMessage(), input);
-                if (msgBuilder == null) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("[{}] Cannot send null message, acking the collector", boltId);
-                    }
-                    collector.ack(input);
-                } else {
-                    final long messageSizeToBeSent = ((TypedMessageBuilderImpl<byte[]>) msgBuilder).getContent()
-                            .remaining();
-                    msgBuilder.sendAsync().handle((msgId, ex) -> {
-                        synchronized (collector) {
-                            if (ex != null) {
-                                collector.reportError(ex);
-                                collector.fail(input);
-                                LOG.error("[{}] Message send failed", boltId, ex);
-
-                            } else {
-                                collector.ack(input);
-                                ++messagesSent;
-                                messageSizeSent += messageSizeToBeSent;
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("[{}] Message sent with id {}", boltId, msgId);
-                                }
-                            }
-                        }
-
-                        return null;
-                    });
-                }
-            }
-        } catch (Exception e) {
-            LOG.error("[{}] Message processing failed", boltId, e);
-            collector.reportError(e);
-            collector.fail(input);
-        }
-    }
-
-    public void close() {
-        try {
-            LOG.info("[{}] Closing Pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic());
-            if (sharedPulsarClient != null) {
-                sharedPulsarClient.close();
-            }
-        } catch (PulsarClientException e) {
-            LOG.error("[{}] Error closing Pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        close();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(declarer);
-    }
-
-    /**
-     * Helpers for metrics
-     */
-
-    @SuppressWarnings({ "rawtypes" })
-    ConcurrentMap getMetrics() {
-        metricsMap.put(NO_OF_MESSAGES_SENT, messagesSent);
-        metricsMap.put(PRODUCER_RATE, ((double) messagesSent) / pulsarBoltConf.getMetricsTimeIntervalInSecs());
-        metricsMap.put(PRODUCER_THROUGHPUT_BYTES,
-                ((double) messageSizeSent) / pulsarBoltConf.getMetricsTimeIntervalInSecs());
-        return metricsMap;
-    }
-
-    void resetMetrics() {
-        messagesSent = 0;
-        messageSizeSent = 0;
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public Object getValueAndReset() {
-        ConcurrentMap metrics = getMetrics();
-        resetMetrics();
-        return metrics;
-    }
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
deleted file mode 100644
index 714e435..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.Objects;
-
-/**
- * Class used to specify Pulsar bolt configuration
- *
- *
- */
-public class PulsarBoltConfiguration extends PulsarStormConfiguration {
-
-    /**
-     *
-     */
-    private static final long serialVersionUID = 1L;
-
-    private TupleToMessageMapper tupleToMessageMapper = null;
-
-    /**
-     * @return the mapper to convert storm tuples to a pulsar message
-     */
-    public TupleToMessageMapper getTupleToMessageMapper() {
-        return tupleToMessageMapper;
-    }
-
-    /**
-     * Sets the mapper to convert storm tuples to a pulsar message
-     * <p>
-     * Note: If the mapper returns null, the message is not sent by the producer and is acked immediately on the
-     * collector
-     * </p>
-     *
-     * @param mapper
-     */
-    public void setTupleToMessageMapper(TupleToMessageMapper mapper) {
-        this.tupleToMessageMapper = Objects.requireNonNull(mapper);
-    }
-
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
deleted file mode 100644
index 63887a4..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static java.lang.String.format;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.impl.Backoff;
-import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PulsarSpout extends BaseRichSpout implements IMetric {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class);
-
-    public static final String NO_OF_PENDING_FAILED_MESSAGES = "numberOfPendingFailedMessages";
-    public static final String NO_OF_MESSAGES_RECEIVED = "numberOfMessagesReceived";
-    public static final String NO_OF_MESSAGES_EMITTED = "numberOfMessagesEmitted";
-    public static final String NO_OF_MESSAGES_FAILED = "numberOfMessagesFailed";
-    public static final String MESSAGE_NOT_AVAILABLE_COUNT = "messageNotAvailableCount";
-    public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
-    public static final String CONSUMER_RATE = "consumerRate";
-    public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
-
-    private final ClientConfigurationData clientConf;
-    private final PulsarSpoutConfiguration pulsarSpoutConf;
-    private final ConsumerConfigurationData<byte[]> consumerConf;
-    private final long failedRetriesTimeoutNano;
-    private final int maxFailedRetries;
-    private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = new ConcurrentHashMap<>();
-    private final Queue<Message<byte[]>> failedMessages = new ConcurrentLinkedQueue<>();
-    private final ConcurrentMap<String, Object> metricsMap = new ConcurrentHashMap<>();
-
-    private SharedPulsarClient sharedPulsarClient;
-    private String componentId;
-    private String spoutId;
-    private SpoutOutputCollector collector;
-    private PulsarSpoutConsumer consumer;
-    private volatile long messagesReceived = 0;
-    private volatile long messagesEmitted = 0;
-    private volatile long messagesFailed = 0;
-    private volatile long messageNotAvailableCount = 0;
-    private volatile long pendingAcks = 0;
-    private volatile long messageSizeReceived = 0;
-
-    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf) {
-        this(pulsarSpoutConf, PulsarClient.builder());
-    }
-    
-    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clientBuilder) {
-        this(pulsarSpoutConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(),
-                new ConsumerConfigurationData<byte[]>());
-    }
-
-    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfigurationData clientConfig,
-            ConsumerConfigurationData<byte[]> consumerConfig) {
-        Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
-        Objects.requireNonNull(pulsarSpoutConf.getTopic());
-        Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
-        Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
-
-        checkNotNull(pulsarSpoutConf, "spout configuration can't be null");
-        checkNotNull(clientConfig, "client configuration can't be null");
-        checkNotNull(consumerConfig, "consumer configuration can't be null");
-        this.clientConf = clientConfig;
-        this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
-        this.consumerConf = consumerConfig;
-        this.pulsarSpoutConf = pulsarSpoutConf;
-        this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
-        this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
-    }
-
-    @Override
-    public void close() {
-        try {
-            LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic());
-            
-            if (pulsarSpoutConf.isAutoUnsubscribe()) {
-                try {
-                    consumer.unsubscribe();    
-                }catch(PulsarClientException e) {
-                    LOG.error("[{}] Failed to unsubscribe {} on topic {}", spoutId,
-                            this.pulsarSpoutConf.getSubscriptionName(), pulsarSpoutConf.getTopic(), e);
-                }
-            }
-            
-            if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != null) {
-                consumer.close();
-            }
-            if (sharedPulsarClient != null) {
-                sharedPulsarClient.close();
-            }
-            pendingMessageRetries.clear();
-            failedMessages.clear();
-        } catch (PulsarClientException e) {
-            LOG.error("[{}] Error closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic(), e);
-        }
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        if (msgId instanceof Message) {
-            Message<?> msg = (Message<?>) msgId;
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("[{}] Received ack for message {}", spoutId, msg.getMessageId());
-            }
-            consumer.acknowledgeAsync(msg);
-            pendingMessageRetries.remove(msg.getMessageId());
-            // we should also remove message from failedMessages but it will be eventually removed while emitting next
-            // tuple
-            --pendingAcks;
-        }
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        if (msgId instanceof Message) {
-            @SuppressWarnings("unchecked")
-            Message<byte[]> msg = (Message<byte[]>) msgId;
-            MessageId id = msg.getMessageId();
-            LOG.warn("[{}] Error processing message {}", spoutId, id);
-
-            // Since the message processing failed, we put it in the failed messages queue if there are more retries
-            // remaining for the message
-            MessageRetries messageRetries = pendingMessageRetries.computeIfAbsent(id, (k) -> new MessageRetries());
-            if ((failedRetriesTimeoutNano < 0
-                    || (messageRetries.getTimeStamp() + failedRetriesTimeoutNano) > System.nanoTime())
-                    && (maxFailedRetries < 0 || messageRetries.numRetries < maxFailedRetries)) {
-                // since we can retry again, we increment retry count and put it in the queue
-                LOG.info("[{}] Putting message {} in the retry queue", spoutId, id);
-                messageRetries.incrementAndGet();
-                pendingMessageRetries.putIfAbsent(id, messageRetries);
-                failedMessages.add(msg);
-                --pendingAcks;
-                messagesFailed++;
-            } else {
-                LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id);
-                ack(msg);
-            }
-        }
-
-    }
-
-    /**
-     * Emits a tuple received from the Pulsar consumer unless there are any failed messages
-     */
-    @Override
-    public void nextTuple() {
-        emitNextAvailableTuple();
-    }
-
-    /**
-     * It makes sure that it emits next available non-tuple to topology unless consumer queue doesn't have any message
-     * available. It receives message from consumer queue and converts it to tuple and emits to topology. if the
-     * converted tuple is null then it tries to receives next message and perform the same until it finds non-tuple to
-     * emit.
-     */
-    public void emitNextAvailableTuple() {
-        // check if there are any failed messages to re-emit in the topology
-        if(emitFailedMessage()) {
-            return;
-        }
-
-        Message<byte[]> msg;
-        // receive from consumer if no failed messages
-        if (consumer != null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("[{}] Receiving the next message from pulsar consumer to emit to the collector", spoutId);
-            }
-            try {
-                boolean done = false;
-                while (!done) {
-                    msg = consumer.receive(100, TimeUnit.MILLISECONDS);
-                    if (msg != null) {
-                        ++messagesReceived;
-                        messageSizeReceived += msg.getData().length;
-                        done = mapToValueAndEmit(msg);
-                    } else {
-                        // queue is empty and nothing to emit
-                        done = true;
-                        messageNotAvailableCount++;
-                    }
-                }
-            } catch (PulsarClientException e) {
-                LOG.error("[{}] Error receiving message from pulsar consumer", spoutId, e);
-            }
-        }
-    }
-
-    private boolean emitFailedMessage() {
-        Message<byte[]> msg;
-
-        while ((msg = failedMessages.peek()) != null) {
-            MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
-            if (messageRetries != null) {
-                // emit the tuple if retry doesn't need backoff else sleep with backoff time and return without doing
-                // anything
-                if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
-                        messageRetries.getNumRetries(), clientConf.getInitialBackoffIntervalNanos(),
-                        clientConf.getMaxBackoffIntervalNanos())) {
-                    Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getInitialBackoffIntervalNanos()));
-                } else {
-                    // remove the message from the queue and emit to the topology, only if it should not be backedoff
-                    LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());
-                    failedMessages.remove();
-                    mapToValueAndEmit(msg);
-                }
-                return true;
-            }
-
-            // messageRetries is null because messageRetries is already acked and removed from pendingMessageRetries
-            // then remove it from failed message queue as well.
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("[{}]-{} removing {} from failedMessage because it's already acked",
-                        pulsarSpoutConf.getTopic(), spoutId, msg.getMessageId());
-            }
-            failedMessages.remove();
-            // try to find out next failed message
-            continue;
-        }
-        return false;
-    }
-
-    @Override
-    @SuppressWarnings({ "rawtypes" })
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        this.componentId = context.getThisComponentId();
-        this.spoutId = String.format("%s-%s", componentId, context.getThisTaskId());
-        this.collector = collector;
-        pendingMessageRetries.clear();
-        failedMessages.clear();
-        try {
-            consumer = createConsumer();
-            LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId,
-                    pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName());
-        } catch (PulsarClientException e) {
-            LOG.error("[{}] Error creating pulsar consumer on topic {}", spoutId, pulsarSpoutConf.getTopic(), e);
-            throw new IllegalStateException(format("Failed to initialize consumer for %s-%s : %s",
-                    pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName(), e.getMessage()), e);
-        }
-        context.registerMetric(String.format("PulsarSpoutMetrics-%s-%s", componentId, context.getThisTaskIndex()), this,
-                pulsarSpoutConf.getMetricsTimeIntervalInSecs());
-    }
-
-    private PulsarSpoutConsumer createConsumer() throws PulsarClientException {
-        sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
-        PulsarSpoutConsumer consumer;
-        if (pulsarSpoutConf.isSharedConsumerEnabled()) {
-            consumer = pulsarSpoutConf.isDurableSubscription()
-                    ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration()))
-                    : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration()));
-        } else {
-            try {
-                consumer = pulsarSpoutConf.isDurableSubscription()
-                        ? new SpoutConsumer(sharedPulsarClient.getClient()
-                                .subscribeAsync(newConsumerConfiguration()).join())
-                        : new SpoutReader(sharedPulsarClient.getClient()
-                                .createReaderAsync(newReaderConfiguration()).join());
-            } catch (CompletionException e) {
-                throw (PulsarClientException) e.getCause();
-            }
-        }
-        return consumer;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer);
-
-    }
-
-    private boolean mapToValueAndEmit(Message<byte[]> msg) {
-        if (msg != null) {
-            Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
-            ++pendingAcks;
-            if (values == null) {
-                // since the mapper returned null, we can drop the message and ack it immediately
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("[{}] Dropping message {}", spoutId, msg.getMessageId());
-                }
-                ack(msg);
-            } else {
-                if (values instanceof PulsarTuple) {
-                    collector.emit(((PulsarTuple) values).getOutputStream(), values, msg);
-                } else {
-                    collector.emit(values, msg);
-                }
-                ++messagesEmitted;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("[{}] Emitted message {} to the collector", spoutId, msg.getMessageId());
-                }
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public class MessageRetries {
-        private final long timestampInNano;
-        private int numRetries;
-
-        public MessageRetries() {
-            this.timestampInNano = System.nanoTime();
-            this.numRetries = 0;
-        }
-
-        public long getTimeStamp() {
-            return timestampInNano;
-        }
-
-        public int incrementAndGet() {
-            return ++numRetries;
-        }
-
-        public int getNumRetries() {
-            return numRetries;
-        }
-    }
-
-    /**
-     * Helpers for metrics
-     */
-
-    @SuppressWarnings({ "rawtypes" })
-    ConcurrentMap getMetrics() {
-        metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long) pendingMessageRetries.size());
-        metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived);
-        metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted);
-        metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed);
-        metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount);
-        metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks);
-        metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
-        metricsMap.put(CONSUMER_THROUGHPUT_BYTES,
-                ((double) messageSizeReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
-        return metricsMap;
-    }
-
-    void resetMetrics() {
-        messagesReceived = 0;
-        messagesEmitted = 0;
-        messageSizeReceived = 0;
-        messagesFailed = 0;
-        messageNotAvailableCount = 0;
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public Object getValueAndReset() {
-        ConcurrentMap metrics = getMetrics();
-        resetMetrics();
-        return metrics;
-    }
-
-    private ReaderConfigurationData<byte[]> newReaderConfiguration() {
-        ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<>();
-        readerConf.setTopicName(pulsarSpoutConf.getTopic());
-        readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName());
-        readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition());
-        if (this.consumerConf != null) {
-            readerConf.setCryptoFailureAction(consumerConf.getCryptoFailureAction());
-            readerConf.setCryptoKeyReader(consumerConf.getCryptoKeyReader());
-            readerConf.setReadCompacted(consumerConf.isReadCompacted());
-            readerConf.setReceiverQueueSize(consumerConf.getReceiverQueueSize());
-        }
-        return readerConf;
-    }
-
-    private ConsumerConfigurationData<byte[]> newConsumerConfiguration() {
-        ConsumerConfigurationData<byte[]> consumerConf = this.consumerConf != null ? this.consumerConf
-                : new ConsumerConfigurationData<>();
-        consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
-        consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
-        consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
-        return consumerConf;
-    }
-
-    static class SpoutConsumer implements PulsarSpoutConsumer {
-        private Consumer<byte[]> consumer;
-
-        public SpoutConsumer(Consumer<byte[]> consumer) {
-            super();
-            this.consumer = consumer;
-        }
-        
-        @Override
-        public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
-            return consumer.receive(timeout, unit);
-        }
-
-        @Override
-        public void acknowledgeAsync(Message<?> msg) {
-            consumer.acknowledgeAsync(msg);
-        }
-
-        @Override
-        public void close() throws PulsarClientException {
-            consumer.close();
-        }
-
-        @Override
-        public void unsubscribe() throws PulsarClientException {
-            consumer.unsubscribe();
-        }
-
-    }
-
-    static class SpoutReader implements PulsarSpoutConsumer {
-        private Reader<byte[]> reader;
-
-        public SpoutReader(Reader<byte[]> reader) {
-            super();
-            this.reader = reader;
-        }
-
-        @Override
-        public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
-            return reader.readNext(timeout, unit);
-        }
-
-        @Override
-        public void acknowledgeAsync(Message<?> msg) {
-            // No-op
-        }
-
-        @Override
-        public void close() throws PulsarClientException {
-            try {
-                reader.close();
-            } catch (IOException e) {
-                throw new PulsarClientException(e);
-            }
-        }
-
-        @Override
-        public void unsubscribe() throws PulsarClientException {
-            // No-op
-        }
-    }
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
deleted file mode 100644
index db797ee..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/**
- * Class used to specify pulsar spout configuration
- *
- *
- */
-public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
-
-    /**
-     *
-     */
-    private static final long serialVersionUID = 1L;
-
-    public static final long DEFAULT_FAILED_RETRIES_TIMEOUT_NANO = TimeUnit.SECONDS.toNanos(60);
-    public static final int DEFAULT_MAX_FAILED_RETRIES = -1;
-
-    private String subscriptionName = null;
-    private MessageToValuesMapper messageToValuesMapper = null;
-    private long failedRetriesTimeoutNano = DEFAULT_FAILED_RETRIES_TIMEOUT_NANO;
-    private int maxFailedRetries = DEFAULT_MAX_FAILED_RETRIES;
-    private boolean sharedConsumerEnabled = false;
-
-    private SubscriptionType subscriptionType = SubscriptionType.Shared;
-    private boolean autoUnsubscribe = false;
-    private boolean durableSubscription = true;
-    // read position if non-durable subscription is enabled : default oldest message available in topic
-    private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; 
-
-    
-    /**
-     * @return the subscription name for the consumer in the spout
-     */
-    public String getSubscriptionName() {
-        return subscriptionName;
-    }
-
-    /**
-     * Sets the subscription name for the consumer in the spout
-     *
-     * @param subscriptionName
-     */
-    public void setSubscriptionName(String subscriptionName) {
-        this.subscriptionName = subscriptionName;
-    }
-
-    public SubscriptionType getSubscriptionType() {
-        return subscriptionType;
-    }
-
-    public void setSubscriptionType(SubscriptionType subscriptionType) {
-        this.subscriptionType = subscriptionType;
-    }
-
-    /**
-     * @return the mapper to convert pulsar message to a storm tuple
-     */
-    public MessageToValuesMapper getMessageToValuesMapper() {
-        return messageToValuesMapper;
-    }
-
-    /**
-     * Sets the mapper to convert pulsar message to a storm tuple.
-     * <p>
-     * Note: If the mapper returns null, the message is not emitted to the collector and is acked immediately
-     * </p>
-     *
-     * @param mapper
-     */
-    public void setMessageToValuesMapper(MessageToValuesMapper mapper) {
-        this.messageToValuesMapper = Objects.requireNonNull(mapper);
-    }
-
-    /**
-     *
-     * @param unit
-     * @return the timeout for retrying failed messages
-     */
-    public long getFailedRetriesTimeout(TimeUnit unit) {
-        return unit.convert(failedRetriesTimeoutNano, TimeUnit.NANOSECONDS);
-    }
-
-    /**
-     * Sets the timeout within which the spout will re-inject failed messages with an exponential backoff <i>(default:
-     * 60 seconds)</i> Note: If set to 0, the message will not be retried when failed. If set to < 0, the message will
-     * be retried forever till it is successfully processed or max message retry count is reached, whichever comes
-     * first.
-     *
-     * @param failedRetriesTimeout
-     * @param unit
-     */
-    public void setFailedRetriesTimeout(long failedRetriesTimeout, TimeUnit unit) {
-        this.failedRetriesTimeoutNano = unit.toNanos(failedRetriesTimeout);
-    }
-
-    /**
-     *
-     * @return the maximum number of times a failed message will be retried
-     */
-    public int getMaxFailedRetries() {
-        return maxFailedRetries;
-    }
-
-    /**
-     * Sets the maximum number of times the spout will re-inject failed messages with an exponential backoff
-     * <i>(default: -1)</i> Note: If set to 0, the message will not be retried when failed. If set to < 0, the message
-     * will be retried forever till it is successfully processed or configured timeout expires, whichever comes first.
-     *
-     * @param maxFailedRetries
-     */
-    public void setMaxFailedRetries(int maxFailedRetries) {
-        this.maxFailedRetries = maxFailedRetries;
-    }
-
-    /**
-     *
-     * @return if the consumer is shared across different executors of a spout
-     */
-    public boolean isSharedConsumerEnabled() {
-        return sharedConsumerEnabled;
-    }
-
-    /**
-     * Sets whether the consumer will be shared across different executors of a spout. <i>(default: false)</i>
-     *
-     * @param sharedConsumerEnabled
-     */
-    public void setSharedConsumerEnabled(boolean sharedConsumerEnabled) {
-        this.sharedConsumerEnabled = sharedConsumerEnabled;
-    }
-    
-    public boolean isAutoUnsubscribe() {
-        return autoUnsubscribe;
-    }
-
-    /**
-     * It unsubscribes the subscription when spout gets closed in the topology.
-     * 
-     * @param autoUnsubscribe
-     */
-    public void setAutoUnsubscribe(boolean autoUnsubscribe) {
-        this.autoUnsubscribe = autoUnsubscribe;
-    }
-    
-    public boolean isDurableSubscription() {
-        return durableSubscription;
-    }
-
-    /**
-     * if subscription is not durable then it creates non-durable reader to start reading from the
-     * {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in topic.
-     * 
-     * @param durableSubscription
-     */
-    public void setDurableSubscription(boolean durableSubscription) {
-        this.durableSubscription = durableSubscription;
-    }
-
-    public MessageId getNonDurableSubscriptionReadPosition() {
-        return nonDurableSubscriptionReadPosition;
-    }
-
-    /**
-     * Non-durable-subscription/Reader can be set to start reading from a specific position earliest/latest.
-     * 
-     * @param nonDurableSubscriptionReadPosition
-     */
-    public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) {
-        this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition;
-    }
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
deleted file mode 100644
index 5502a62..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-public interface PulsarSpoutConsumer {
-
-    /**
-     * Receives a single message.
-     * 
-     * @param waitTime
-     * @param unit
-     * @return
-     * @throws PulsarClientException
-     */
-    Message<byte[]> receive(int waitTime, TimeUnit unit) throws PulsarClientException;
-    
-    /**
-     * Ack the message async.
-     * 
-     * @param msg
-     */
-    void acknowledgeAsync(Message<?> msg);
-
-    /**
-     * unsubscribe the consumer
-     * @throws PulsarClientException 
-     */
-    void unsubscribe() throws PulsarClientException;
-    
-    /**
-     * Close the consumer
-     * 
-     * @throws PulsarClientException
-     */
-    void close() throws PulsarClientException;
-
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java
deleted file mode 100644
index 3f0597b..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.io.Serializable;
-
-/**
- * Class used to specify pulsar storm configurations like service url and topic
- *
- *
- */
-public class PulsarStormConfiguration implements Serializable {
-
-    /**
-     *
-     */
-    private static final long serialVersionUID = 1L;
-
-    public static final int DEFAULT_METRICS_TIME_INTERVAL_IN_SECS = 60;
-
-    private String serviceUrl = null;
-    private String topic = null;
-    private int metricsTimeIntervalInSecs = DEFAULT_METRICS_TIME_INTERVAL_IN_SECS;
-
-    /**
-     * @return the service URL to connect to from the client
-     */
-    public String getServiceUrl() {
-        return serviceUrl;
-    }
-
-    /**
-     * Sets the service URL to connect to from the client
-     *
-     * @param serviceUrl
-     */
-    public void setServiceUrl(String serviceUrl) {
-        this.serviceUrl = serviceUrl;
-    }
-
-    /**
-     * @return the topic name for the producer/consumer
-     */
-    public String getTopic() {
-        return topic;
-    }
-
-    /**
-     * Sets the topic name for the producer/consumer. It should be of the format
-     * {persistent|non-persistent}://{property}/{cluster}/{namespace}/{topic}
-     *
-     * @param topic
-     */
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    /**
-     * @return the time interval in seconds for metrics generation
-     */
-    public int getMetricsTimeIntervalInSecs() {
-        return metricsTimeIntervalInSecs;
-    }
-
-    /**
-     * Sets the time interval in seconds for metrics generation <i>(default: 60 seconds)</i>
-     *
-     * @param metricsTimeIntervalInSecs
-     */
-    public void setMetricsTimeIntervalInSecs(int metricsTimeIntervalInSecs) {
-        this.metricsTimeIntervalInSecs = metricsTimeIntervalInSecs;
-    }
-
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
deleted file mode 100644
index b000827..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-
-import org.apache.storm.tuple.Values;
-
-/**
- * Returned by MessageToValuesMapper, this specifies the Values
- * for an output tuple and the stream it should be sent to.
- */
-public class PulsarTuple extends Values {
-
-    protected final String outputStream;
-
-    public PulsarTuple(String outStream, Object ... values) {
-        super(values);
-        outputStream = outStream;
-    }
-
-    /**
-     * Return stream the tuple should be emitted on.
-     *
-     * @return String
-     */
-    public String getOutputStream() {
-        return outputStream;
-    }
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
deleted file mode 100644
index f4950c6..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SharedPulsarClient {
-    private static final Logger LOG = LoggerFactory.getLogger(SharedPulsarClient.class);
-    private static final ConcurrentMap<String, SharedPulsarClient> instances = new ConcurrentHashMap<>();
-
-    private final String componentId;
-    private final PulsarClientImpl client;
-    private final AtomicInteger counter = new AtomicInteger();
-
-    private Consumer<byte[]> consumer;
-    private Reader<byte[]> reader;
-    private Producer<byte[]> producer;
-
-    private SharedPulsarClient(String componentId, ClientConfigurationData clientConf)
-            throws PulsarClientException {
-        this.client = new PulsarClientImpl(clientConf);
-        this.componentId = componentId;
-    }
-
-    /**
-     * Provides a shared pulsar client that is shared across all different tasks in the same component. Different
-     * components will not share the pulsar client since they can have different configurations.
-     *
-     * @param componentId
-     *            the id of the spout/bolt
-     * @param clientConf
-     * @return
-     * @throws PulsarClientException
-     */
-    public static SharedPulsarClient get(String componentId, ClientConfigurationData clientConf)
-            throws PulsarClientException {
-        AtomicReference<PulsarClientException> exception = new AtomicReference<PulsarClientException>();
-        instances.computeIfAbsent(componentId, pulsarClient -> {
-            SharedPulsarClient sharedPulsarClient = null;
-            try {
-                sharedPulsarClient = new SharedPulsarClient(componentId, clientConf);
-                LOG.info("[{}] Created a new Pulsar Client.", componentId);
-            } catch (PulsarClientException e) {
-                exception.set(e);
-            }
-            return sharedPulsarClient;
-        });
-        if (exception.get() != null) {
-            throw exception.get();
-        }
-        return instances.get(componentId);
-    }
-
-    public PulsarClientImpl getClient() {
-        counter.incrementAndGet();
-        return client;
-    }
-
-    public Consumer<byte[]> getSharedConsumer(ConsumerConfigurationData<byte[]> consumerConf)
-            throws PulsarClientException {
-        counter.incrementAndGet();
-        synchronized (this) {
-            if (consumer == null) {
-                try {
-                    consumer = client.subscribeAsync(consumerConf).join();
-                } catch (CompletionException e) {
-                    throw (PulsarClientException) e.getCause();
-                }
-                LOG.info("[{}] Created a new Pulsar Consumer on {}", componentId, consumerConf.getSingleTopic());
-            } else {
-                LOG.info("[{}] Using a shared consumer on {}", componentId, consumerConf.getSingleTopic());
-            }
-        }
-        return consumer;
-    }
-
-    public Reader<byte[]> getSharedReader(ReaderConfigurationData<byte[]> readerConf) throws PulsarClientException {
-        counter.incrementAndGet();
-        synchronized (this) {
-            if (reader == null) {
-                try {
-                    reader = client.createReaderAsync(readerConf).join();
-                } catch (CompletionException e) {
-                    throw (PulsarClientException) e.getCause();
-                }
-                LOG.info("[{}] Created a new Pulsar reader on {}", componentId, readerConf.getTopicName());
-            } else {
-                LOG.info("[{}] Using a shared reader on {}", componentId, readerConf.getTopicName());
-            }
-        }
-        return reader;
-    }
-
-    public Producer<byte[]> getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException {
-        counter.incrementAndGet();
-        synchronized (this) {
-            if (producer == null) {
-                try {
-                    producer = client.createProducerAsync(producerConf).join();
-                } catch (CompletionException e) {
-                    throw (PulsarClientException) e.getCause();
-                }
-                LOG.info("[{}] Created a new Pulsar Producer on {}", componentId, producerConf.getTopicName());
-            } else {
-                LOG.info("[{}] Using a shared producer on {}", componentId, producerConf.getTopicName());
-            }
-        }
-        return producer;
-    }
-
-    public void close() throws PulsarClientException {
-        if (counter.decrementAndGet() <= 0) {
-            if (client != null) {
-                client.close();
-                instances.remove(componentId);
-                LOG.info("[{}] Closed Pulsar Client", componentId);
-            }
-        }
-    }
-
-}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
deleted file mode 100644
index 452e0ce..0000000
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.io.Serializable;
-
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-
-public interface TupleToMessageMapper extends Serializable {
-
-    /**
-     * Convert tuple to {@link org.apache.pulsar.client.api.Message}.
-     *
-     * @param tuple
-     * @return
-     * @deprecated use {@link #toMessage(TypedMessageBuilder, Tuple)}
-     */
-    @Deprecated
-    default Message<byte[]> toMessage(Tuple tuple) {
-        return null;
-    }
-
-    /**
-     * Set the value on a message builder to prepare the message to be published from the Bolt.
-     *
-     * @param tuple
-     * @return
-     */
-    default TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
-        // Default implementation provided for backward compatibility
-        Message<byte[]> msg = toMessage(tuple);
-        msgBuilder.value(msg.getData())
-            .properties(msg.getProperties());
-        if (msg.hasKey()) {
-            msgBuilder.key(msg.getKey());
-        }
-        return msgBuilder;
-    }
-
-
-    /**
-     * Declare the output schema for the bolt.
-     *
-     * @param declarer
-     */
-    public void declareOutputFields(OutputFieldsDeclarer declarer);
-}
diff --git a/pulsar-storm/src/main/javadoc/overview.html b/pulsar-storm/src/main/javadoc/overview.html
deleted file mode 100644
index a1595eb..0000000
--- a/pulsar-storm/src/main/javadoc/overview.html
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
-<HTML>
-  <HEAD>
-    <TITLE>Pulsar Storm API Overview</TITLE>
-  </HEAD>
-  <BODY>
-    The Pulsar Storm API is a proprietary messaging API.
-  </BODY>
-</HTML>
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
deleted file mode 100644
index e6cbc51..0000000
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-import org.mockito.ArgumentCaptor;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.Maps;
-
-public class PulsarSpoutTest {
-
-    @Test
-    public void testAckFailedMessage() throws Exception {
-
-        PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
-        conf.setServiceUrl("http://localhost:8080");
-        conf.setSubscriptionName("sub1");
-        conf.setTopic("persistent://prop/ns1/topic1");
-        conf.setSubscriptionType(SubscriptionType.Exclusive);
-        conf.setMessageToValuesMapper(new MessageToValuesMapper() {
-            @Override
-            public Values toValues(Message<byte[]> msg) {
-                return null;
-            }
-
-            @Override
-            public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            }
-
-        });
-
-        ClientBuilder builder = spy(new ClientBuilderImpl());
-        PulsarSpout spout = spy(new PulsarSpout(conf, builder));
-
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(),
-                new byte[0], Schema.BYTES, new MessageMetadata());
-        Consumer<byte[]> consumer = mock(Consumer.class);
-        SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        future.complete(null);
-        doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId());
-        Field consField = PulsarSpout.class.getDeclaredField("consumer");
-        consField.setAccessible(true);
-        consField.set(spout, spoutConsumer);
-
-        spout.fail(msg);
-        spout.ack(msg);
-        spout.emitNextAvailableTuple();
-        verify(consumer, atLeast(1)).receive(anyInt(), any());
-    }
-
-    @Test
-    public void testPulsarTuple() throws Exception {
-        testPulsarSpout(true);
-    }
-
-    @Test
-    public void testPulsarSpout() throws Exception {
-        testPulsarSpout(false);
-    }
-
-    public void testPulsarSpout(boolean pulsarTuple) throws Exception {
-        PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
-        conf.setServiceUrl("http://localhost:8080");
-        conf.setSubscriptionName("sub1");
-        conf.setTopic("persistent://prop/ns1/topic1");
-        conf.setSubscriptionType(SubscriptionType.Exclusive);
-        conf.setSharedConsumerEnabled(true);
-        AtomicBoolean called = new AtomicBoolean(false);
-        conf.setMessageToValuesMapper(new MessageToValuesMapper() {
-            @Override
-            public Values toValues(Message<byte[]> msg) {
-                called.set(true);
-                if ("message to be dropped".equals(new String(msg.getData()))) {
-                    return null;
-                }
-                String val = new String(msg.getData());
-                if (val.startsWith("stream:")) {
-                    String stream = val.split(":")[1];
-                    return new PulsarTuple(stream, val);
-                }
-                return new Values(val);
-            }
-
-            @Override
-            public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            }
-
-        });
-
-        String msgContent = pulsarTuple ? "stream:pstream" : "test";
-
-        ClientBuilder builder = spy(new ClientBuilderImpl());
-        PulsarSpout spout = spy(new PulsarSpout(conf, builder));
-        TopologyContext context = mock(TopologyContext.class);
-        final String componentId = "test-component-id";
-        doReturn(componentId).when(context).getThisComponentId();
-        SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
-        Map config = new HashMap<>();
-        Field field = SharedPulsarClient.class.getDeclaredField("instances");
-        field.setAccessible(true);
-        ConcurrentMap<String, SharedPulsarClient> instances = (ConcurrentMap<String, SharedPulsarClient>) field
-                .get(SharedPulsarClient.class);
-
-        SharedPulsarClient client = mock(SharedPulsarClient.class);
-        Consumer<byte[]> consumer = mock(Consumer.class);
-        when(client.getSharedConsumer(any())).thenReturn(consumer);
-        instances.put(componentId, client);
-
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(),
-                msgContent.getBytes(), Schema.BYTES, new MessageMetadata());
-        when(consumer.receive(anyInt(), any())).thenReturn(msg);
-
-        spout.open(config, context, collector);
-        spout.emitNextAvailableTuple();
-
-        assertTrue(called.get());
-        verify(consumer, atLeast(1)).receive(anyInt(), any());
-        ArgumentCaptor<Values> capt = ArgumentCaptor.forClass(Values.class);
-        if (pulsarTuple) {
-            verify(collector, times(1)).emit(eq("pstream"), capt.capture(), eq(msg));
-        } else {
-            verify(collector, times(1)).emit(capt.capture(), eq(msg));
-        }
-        Values vals = capt.getValue();
-        assertEquals(msgContent, vals.get(0));
-    }
-
-}
diff --git a/tests/pom.xml b/tests/pom.xml
index 91b3416..188a296 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -33,7 +33,6 @@
   <name>Apache Pulsar Adapters :: Tests</name>
   <modules>
     <module>pulsar-kafka-compat-client-test</module>
-    <module>pulsar-storm-test</module>
     <module>pulsar-spark-test</module>
   </modules>
   <build>
diff --git a/tests/pulsar-storm-test/pom.xml b/tests/pulsar-storm-test/pom.xml
deleted file mode 100644
index 3134328..0000000
--- a/tests/pulsar-storm-test/pom.xml
+++ /dev/null
@@ -1,131 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
-  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.pulsar.tests</groupId>
-    <artifactId>adapters-tests-parent</artifactId>
-    <version>2.11.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>pulsar-storm-test</artifactId>
-  <packaging>jar</packaging>
-  <name>Pulsar Storm adapter Tests</name>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-storm</artifactId>
-      <version>2.11.0-SNAPSHOT</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.pulsar</groupId>
-          <artifactId>pulsar-client</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-util</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.storm</groupId>
-      <artifactId>storm-server</artifactId>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
-        </exclusion>
-        <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>log4j-over-slf4j</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>buildtools</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-broker</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-broker</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>testmocks</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.asynchttpclient</groupId>
-      <artifactId>async-http-client</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.storm</groupId>
-      <artifactId>storm-core</artifactId>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>ch.qos.logback</groupId>
-          <artifactId>logback-classic</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>log4j-over-slf4j</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-  </dependencies>
-</project>
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
deleted file mode 100644
index 4355ad6..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.storm.task.IOutputCollector;
-import org.apache.storm.tuple.Tuple;
-
-public class MockOutputCollector implements IOutputCollector {
-
-    private boolean acked = false;
-    private boolean failed = false;
-    private Throwable lastError = null;
-    private Tuple ackedTuple = null;
-    private int numTuplesAcked = 0;
-
-    @Override
-    public void reportError(Throwable error) {
-        lastError = error;
-    }
-
-    @Override
-    public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-        return null;
-    }
-
-    @Override
-    public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-    }
-
-    @Override
-    public void ack(Tuple input) {
-        acked = true;
-        failed = false;
-        ackedTuple = input;
-        ++numTuplesAcked;
-    }
-
-    @Override
-    public void fail(Tuple input) {
-        failed = true;
-        acked = false;
-    }
-
-    @Override
-    public void resetTimeout(Tuple tuple) {
-
-    }
-
-    public boolean acked() {
-        return acked;
-    }
-
-    public boolean failed() {
-        return failed;
-    }
-
-    public Throwable getLastError() {
-        return lastError;
-    }
-
-    public Tuple getAckedTuple() {
-        return ackedTuple;
-    }
-
-    public int getNumTuplesAcked() {
-        return numTuplesAcked;
-    }
-
-    public void reset() {
-        acked = false;
-        failed = false;
-        lastError = null;
-        ackedTuple = null;
-        numTuplesAcked = 0;
-    }
-
-    @Override
-    public void flush() {
-        // Nothing to flush from buffer
-    }
-
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
deleted file mode 100644
index 98c8d20..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.pulsar.client.api.Message;
-
-import org.apache.storm.spout.ISpoutOutputCollector;
-
-public class MockSpoutOutputCollector implements ISpoutOutputCollector {
-
-    private boolean emitted = false;
-    private Message lastMessage = null;
-    private String data = null;
-
-    @Override
-    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
-        emitted = true;
-        data = (String) tuple.get(0);
-        lastMessage = (Message) messageId;
-        return new ArrayList<Integer>();
-    }
-
-    @Override
-    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
-        emitted = true;
-        data = (String) tuple.get(0);
-        lastMessage = (Message) messageId;
-    }
-
-    @Override
-    public long getPendingCount() {
-        return 0;
-    }
-
-    @Override
-    public void reportError(Throwable error) {
-    }
-
-    public boolean emitted() {
-        return emitted;
-    }
-
-    public String getTupleData() {
-        return data;
-    }
-
-    public Message getLastMessage() {
-        return lastMessage;
-    }
-
-    public void reset() {
-        emitted = false;
-        data = null;
-        lastMessage = null;
-    }
-    
-    @Override
-    public void flush() {
-        // Nothing to flush from buffer
-    }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
deleted file mode 100644
index b90e855..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.fail;
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-public class PulsarBoltTest extends ProducerConsumerBase {
-
-    private static final int NO_OF_RETRIES = 10;
-
-    public String serviceUrl;
-    public final String topic = "persistent://my-property/my-ns/my-topic1";
-    public final String subscriptionName = "my-subscriber-name";
-
-    protected PulsarBoltConfiguration pulsarBoltConf;
-    protected PulsarBolt bolt;
-    protected MockOutputCollector mockCollector;
-    protected Consumer consumer;
-
-    @Override
-    @BeforeMethod
-    public void beforeMethod(Method m) throws Exception {
-        super.beforeMethod(m);
-        setup();
-    }
-
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-
-        serviceUrl = pulsar.getWebServiceAddress();
-
-        pulsarBoltConf = new PulsarBoltConfiguration();
-        pulsarBoltConf.setServiceUrl(serviceUrl);
-        pulsarBoltConf.setTopic(topic);
-        pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
-        pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
-        bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
-        mockCollector = new MockOutputCollector();
-        OutputCollector collector = new OutputCollector(mockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName);
-        when(context.getThisTaskId()).thenReturn(0);
-        bolt.prepare(Maps.newHashMap(), context, collector);
-        consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName).subscribe();
-    }
-
-    @AfterMethod
-    public void cleanup() throws Exception {
-        bolt.close();
-        consumer.close();
-        super.internalCleanup();
-    }
-
-    @SuppressWarnings("serial")
-    static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
-
-        @Override
-        public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
-            if ("message to be dropped".equals(new String(tuple.getBinary(0)))) {
-                return null;
-            }
-            if ("throw exception".equals(new String(tuple.getBinary(0)))) {
-                throw new RuntimeException();
-            }
-            return msgBuilder.value(tuple.getBinary(0));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-    };
-
-    private Tuple getMockTuple(String msgContent) {
-        Tuple mockTuple = mock(Tuple.class);
-        when(mockTuple.getBinary(0)).thenReturn(msgContent.getBytes());
-        when(mockTuple.getSourceComponent()).thenReturn("");
-        when(mockTuple.getSourceStreamId()).thenReturn("");
-        return mockTuple;
-    }
-
-    @Test
-    public void testBasic() throws Exception {
-        String msgContent = "hello world";
-        Tuple tuple = getMockTuple(msgContent);
-        bolt.execute(tuple);
-        for (int i = 0; i < NO_OF_RETRIES; i++) {
-            Thread.sleep(1000);
-            if (mockCollector.acked()) {
-                break;
-            }
-        }
-        Assert.assertTrue(mockCollector.acked());
-        Assert.assertFalse(mockCollector.failed());
-        Assert.assertNull(mockCollector.getLastError());
-        Assert.assertEquals(tuple, mockCollector.getAckedTuple());
-        Message msg = consumer.receive(5, TimeUnit.SECONDS);
-        consumer.acknowledge(msg);
-        Assert.assertEquals(msgContent, new String(msg.getData()));
-    }
-
-    @Test
-    public void testExecuteFailure() throws Exception {
-        String msgContent = "throw exception";
-        Tuple tuple = getMockTuple(msgContent);
-        bolt.execute(tuple);
-        Assert.assertFalse(mockCollector.acked());
-        Assert.assertTrue(mockCollector.failed());
-        Assert.assertNotNull(mockCollector.getLastError());
-    }
-
-    @Test
-    public void testNoMessageSend() throws Exception {
-        String msgContent = "message to be dropped";
-        Tuple tuple = getMockTuple(msgContent);
-        bolt.execute(tuple);
-        Assert.assertTrue(mockCollector.acked());
-        Message msg = consumer.receive(5, TimeUnit.SECONDS);
-        Assert.assertNull(msg);
-    }
-
-    @Test
-    public void testMetrics() throws Exception {
-        bolt.resetMetrics();
-        String msgContent = "hello world";
-        Tuple tuple = getMockTuple(msgContent);
-        for (int i = 0; i < 10; i++) {
-            bolt.execute(tuple);
-        }
-        for (int i = 0; i < NO_OF_RETRIES; i++) {
-            Thread.sleep(1000);
-            if (mockCollector.getNumTuplesAcked() == 10) {
-                break;
-            }
-        }
-        @SuppressWarnings("rawtypes")
-        Map metrics = (Map) bolt.getValueAndReset();
-        Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 10);
-        Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_RATE)).doubleValue(),
-                10.0 / pulsarBoltConf.getMetricsTimeIntervalInSecs());
-        Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_THROUGHPUT_BYTES)).doubleValue(),
-                ((double) msgContent.getBytes().length * 10) / pulsarBoltConf.getMetricsTimeIntervalInSecs());
-        metrics = bolt.getMetrics();
-        Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 0);
-        for (int i = 0; i < 10; i++) {
-            Message msg = consumer.receive(5, TimeUnit.SECONDS);
-            consumer.acknowledge(msg);
-        }
-    }
-
-    @Test
-    public void testSharedProducer() throws Exception {
-        TopicStats topicStats = admin.topics().getStats(topic);
-        Assert.assertEquals(topicStats.getPublishers().size(), 1);
-        PulsarBolt otherBolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
-        MockOutputCollector otherMockCollector = new MockOutputCollector();
-        OutputCollector collector = new OutputCollector(otherMockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName);
-        when(context.getThisTaskId()).thenReturn(1);
-        otherBolt.prepare(Maps.newHashMap(), context, collector);
-
-        topicStats = admin.topics().getStats(topic);
-        Assert.assertEquals(topicStats.getPublishers().size(), 1);
-
-        otherBolt.close();
-
-        topicStats = admin.topics().getStats(topic);
-        Assert.assertEquals(topicStats.getPublishers().size(), 1);
-    }
-
-    @Test
-    public void testSerializability() throws Exception {
-        // test serializability with no auth
-        PulsarBolt boltWithNoAuth = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
-        TestUtil.testSerializability(boltWithNoAuth);
-    }
-
-    @Test
-    public void testFailedProducer() {
-        PulsarBoltConfiguration pulsarBoltConf = new PulsarBoltConfiguration();
-        pulsarBoltConf.setServiceUrl(serviceUrl);
-        pulsarBoltConf.setTopic("persistent://invalid");
-        pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
-        pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
-        PulsarBolt bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
-        MockOutputCollector mockCollector = new MockOutputCollector();
-        OutputCollector collector = new OutputCollector(mockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("new" + methodName);
-        when(context.getThisTaskId()).thenReturn(0);
-        try {
-            bolt.prepare(Maps.newHashMap(), context, collector);
-            fail("should have failed as producer creation failed");
-        } catch (IllegalStateException ie) {
-            // Ok.
-        }
-    }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
deleted file mode 100644
index 322e41b..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-public class PulsarSpoutTest extends ProducerConsumerBase {
-
-    public String serviceUrl;
-    public final String topic = "persistent://my-property/my-ns/my-topic1";
-    public final String subscriptionName = "my-subscriber-name";
-
-    protected PulsarSpoutConfiguration pulsarSpoutConf;
-    protected PulsarSpout spout;
-    protected MockSpoutOutputCollector mockCollector;
-    protected Producer producer;
-
-    @Override
-    @BeforeMethod
-    public void beforeMethod(Method m) throws Exception {
-        super.beforeMethod(m);
-        setup();
-    }
-
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-
-        serviceUrl = pulsar.getWebServiceAddress();
-
-        pulsarSpoutConf = new PulsarSpoutConfiguration();
-        pulsarSpoutConf.setServiceUrl(serviceUrl);
-        pulsarSpoutConf.setTopic(topic);
-        pulsarSpoutConf.setSubscriptionName(subscriptionName);
-        pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
-        pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
-        pulsarSpoutConf.setMaxFailedRetries(2);
-        pulsarSpoutConf.setSharedConsumerEnabled(true);
-        pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
-        pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
-        spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
-        mockCollector = new MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
-        when(context.getThisTaskId()).thenReturn(0);
-        spout.open(Maps.newHashMap(), context, collector);
-        producer = pulsarClient.newProducer().topic(topic).create();
-    }
-
-    @AfterMethod
-    public void cleanup() throws Exception {
-        producer.close();
-        spout.close();
-        super.internalCleanup();
-    }
-
-    @SuppressWarnings("serial")
-    public static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
-
-        @Override
-        public Values toValues(Message msg) {
-            if ("message to be dropped".equals(new String(msg.getData()))) {
-                return null;
-            }
-            return new Values(new String(msg.getData()));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-    };
-
-    @Test
-    public void testBasic() throws Exception {
-        String msgContent = "hello world";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.ack(mockCollector.getLastMessage());
-    }
-
-    @Test
-    public void testRedeliverOnFail() throws Exception {
-        String msgContent = "hello world";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        spout.fail(mockCollector.getLastMessage());
-        mockCollector.reset();
-        Thread.sleep(150);
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.ack(mockCollector.getLastMessage());
-    }
-
-    @Test
-    public void testNoRedeliverOnAck() throws Exception {
-        String msgContent = "hello world";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        spout.ack(mockCollector.getLastMessage());
-        mockCollector.reset();
-        spout.nextTuple();
-        assertFalse(mockCollector.emitted());
-        assertNull(mockCollector.getTupleData());
-    }
-
-    @Test
-    public void testLimitedRedeliveriesOnTimeout() throws Exception {
-        String msgContent = "chuck norris";
-        producer.send(msgContent.getBytes());
-
-        long startTime = System.currentTimeMillis();
-        while (startTime + pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.MILLISECONDS) > System
-                .currentTimeMillis()) {
-            mockCollector.reset();
-            spout.nextTuple();
-            assertTrue(mockCollector.emitted());
-            assertEquals(mockCollector.getTupleData(), msgContent);
-            spout.fail(mockCollector.getLastMessage());
-            // wait to avoid backoff
-            Thread.sleep(500);
-        }
-        spout.nextTuple();
-        spout.fail(mockCollector.getLastMessage());
-        mockCollector.reset();
-        Thread.sleep(500);
-        spout.nextTuple();
-        assertFalse(mockCollector.emitted());
-        assertNull(mockCollector.getTupleData());
-    }
-
-    @Test
-    public void testLimitedRedeliveriesOnCount() throws Exception {
-        String msgContent = "hello world";
-        producer.send(msgContent.getBytes());
-
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.fail(mockCollector.getLastMessage());
-
-        mockCollector.reset();
-        Thread.sleep(150);
-
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.fail(mockCollector.getLastMessage());
-
-        mockCollector.reset();
-        Thread.sleep(300);
-
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.fail(mockCollector.getLastMessage());
-
-        mockCollector.reset();
-        Thread.sleep(500);
-        spout.nextTuple();
-        assertFalse(mockCollector.emitted());
-        assertNull(mockCollector.getTupleData());
-    }
-
-    @Test
-    public void testBackoffOnRetry() throws Exception {
-        String msgContent = "chuck norris";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        spout.fail(mockCollector.getLastMessage());
-        mockCollector.reset();
-        // due to backoff we should not get the message again immediately
-        spout.nextTuple();
-        assertFalse(mockCollector.emitted());
-        assertNull(mockCollector.getTupleData());
-        Thread.sleep(100);
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.ack(mockCollector.getLastMessage());
-    }
-
-    @Test
-    public void testMessageDrop() throws Exception {
-        String msgContent = "message to be dropped";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        assertFalse(mockCollector.emitted());
-        assertNull(mockCollector.getTupleData());
-    }
-
-    @SuppressWarnings({ "rawtypes" })
-    @Test
-    public void testMetrics() throws Exception {
-        spout.resetMetrics();
-        String msgContent = "hello world";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        Map metrics = spout.getMetrics();
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
-        assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_RATE)).doubleValue(),
-                1.0 / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
-        assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_THROUGHPUT_BYTES)).doubleValue(),
-                ((double) msgContent.getBytes().length) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
-        spout.fail(mockCollector.getLastMessage());
-        metrics = spout.getMetrics();
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
-        Thread.sleep(150);
-        spout.nextTuple();
-        metrics = spout.getMetrics();
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
-        spout.ack(mockCollector.getLastMessage());
-        metrics = (Map) spout.getValueAndReset();
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
-    }
-
-    @Test
-    public void testSharedConsumer() throws Exception {
-        TopicStats topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1);
-        PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
-        MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
-        when(context.getThisTaskId()).thenReturn(1);
-        otherSpout.open(Maps.newHashMap(), context, collector);
-
-        topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1);
-
-        otherSpout.close();
-
-        topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1);
-    }
-
-    @Test
-    public void testNoSharedConsumer() throws Exception {
-        TopicStats topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1);
-        pulsarSpoutConf.setSharedConsumerEnabled(false);
-        PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
-        MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
-        when(context.getThisTaskId()).thenReturn(1);
-        otherSpout.open(Maps.newHashMap(), context, collector);
-
-        topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 2);
-
-        otherSpout.close();
-
-        topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1);
-    }
-
-    @Test
-    public void testSerializability() throws Exception {
-        // test serializability with no auth
-        PulsarSpout spoutWithNoAuth = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
-        TestUtil.testSerializability(spoutWithNoAuth);
-    }
-
-    @Test
-    public void testFailedConsumer() {
-        PulsarSpoutConfiguration pulsarSpoutConf = new PulsarSpoutConfiguration();
-        pulsarSpoutConf.setServiceUrl(serviceUrl);
-        pulsarSpoutConf.setTopic("persistent://invalidTopic");
-        pulsarSpoutConf.setSubscriptionName(subscriptionName);
-        pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
-        pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
-        pulsarSpoutConf.setMaxFailedRetries(2);
-        pulsarSpoutConf.setSharedConsumerEnabled(false);
-        pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
-        pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
-        PulsarSpout spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
-        MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("new-test" + methodName);
-        when(context.getThisTaskId()).thenReturn(0);
-        try {
-            spout.open(Maps.newHashMap(), context, collector);
-            fail("should have failed as consumer creation failed");
-        } catch (IllegalStateException e) {
-            // Ok
-        }
-    }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
deleted file mode 100644
index a71e088..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-
-import org.testng.Assert;
-
-public class TestUtil {
-
-    public static void testSerializability(Object object) throws Exception {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(out);
-        oos.writeObject(object);
-        oos.close();
-        Assert.assertTrue(out.toByteArray().length > 0);
-    }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
deleted file mode 100644
index 93404ea..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm.example;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.storm.MessageToValuesMapper;
-import org.apache.pulsar.storm.PulsarBolt;
-import org.apache.pulsar.storm.PulsarBoltConfiguration;
-import org.apache.pulsar.storm.PulsarSpout;
-import org.apache.pulsar.storm.PulsarSpoutConfiguration;
-import org.apache.pulsar.storm.TupleToMessageMapper;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.metric.api.IMetricsConsumer;
-import org.apache.storm.task.IErrorReporter;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StormExample {
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class);
-    private static final String serviceUrl = "http://broker-pdev.messaging.corp.usw.example.com:8080";
-
-    @SuppressWarnings("serial")
-    static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
-
-        @Override
-        public Values toValues(Message msg) {
-            return new Values(new String(msg.getData()));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            // declare the output fields
-            declarer.declare(new Fields("string"));
-        }
-    };
-
-    @SuppressWarnings("serial")
-    static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
-
-        @Override
-        public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
-            String receivedMessage = tuple.getString(0);
-            // message processing
-            String processedMsg = receivedMessage + "-processed";
-            return msgBuilder.value(processedMsg.getBytes());
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            // declare the output fields
-        }
-    };
-
-    public static void main(String[] args) throws Exception {
-        // String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
-        // String authParams = "key1:val1,key2:val2";
-        // clientConf.setAuthentication(authPluginClassName, authParams);
-
-        String topic1 = "persistent://my-property/use/my-ns/my-topic1";
-        String topic2 = "persistent://my-property/use/my-ns/my-topic2";
-        String subscriptionName1 = "my-subscriber-name1";
-        String subscriptionName2 = "my-subscriber-name2";
-
-        // create spout
-        PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
-        spoutConf.setServiceUrl(serviceUrl);
-        spoutConf.setTopic(topic1);
-        spoutConf.setSubscriptionName(subscriptionName1);
-        spoutConf.setMessageToValuesMapper(messageToValuesMapper);
-        PulsarSpout spout = new PulsarSpout(spoutConf, PulsarClient.builder());
-
-        // create bolt
-        PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
-        boltConf.setServiceUrl(serviceUrl);
-        boltConf.setTopic(topic2);
-        boltConf.setTupleToMessageMapper(tupleToMessageMapper);
-        PulsarBolt bolt = new PulsarBolt(boltConf, PulsarClient.builder());
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("testSpout", spout);
-        builder.setBolt("testBolt", bolt).shuffleGrouping("testSpout");
-
-        Config conf = new Config();
-        conf.setNumWorkers(2);
-        conf.setDebug(true);
-        conf.registerMetricsConsumer(PulsarMetricsConsumer.class);
-
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("test", conf, builder.createTopology());
-        Utils.sleep(10000);
-
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
-        // create a consumer on topic2 to receive messages from the bolt when the processing is done
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic2).subscriptionName(subscriptionName2).subscribe();
-        // create a producer on topic1 to send messages that will be received by the spout
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic1).create();
-
-        for (int i = 0; i < 10; i++) {
-            String msg = "msg-" + i;
-            producer.send(msg.getBytes());
-            LOG.info("Message {} sent", msg);
-        }
-        Message<byte[]> msg = null;
-        for (int i = 0; i < 10; i++) {
-            msg = consumer.receive(1, TimeUnit.SECONDS);
-            LOG.info("Message {} received", new String(msg.getData()));
-        }
-        cluster.killTopology("test");
-        cluster.shutdown();
-
-    }
-
-    class PulsarMetricsConsumer implements IMetricsConsumer {
-
-        @Override
-        public void prepare(Map stormConf, Object registrationArgument, TopologyContext context,
-                IErrorReporter errorReporter) {
-        }
-
-        @Override
-        public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-            // The collection will contain metrics for all the spouts/bolts that register the metrics in the topology.
-            // The name for the Pulsar Spout is "PulsarSpoutMetrics-{componentId}-{taskIndex}" and for the Pulsar Bolt
-            // is
-            // "PulsarBoltMetrics-{componentId}-{taskIndex}".
-        }
-
-        @Override
-        public void cleanup() {
-        }
-
-    }
-}