You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/06/09 11:44:12 UTC

[1/2] storm git commit: STORM-3097: Remove storm-druid

Repository: storm
Updated Branches:
  refs/heads/master 4aa6b5e18 -> 0eb6b5116


STORM-3097: Remove storm-druid

This closes #2707


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b6cab8dc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b6cab8dc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b6cab8dc

Branch: refs/heads/master
Commit: b6cab8dca97e2238aef8e45118f6946fc2d35049
Parents: 4aa6b5e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Jun 6 15:11:02 2018 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jun 9 20:43:33 2018 +0900

----------------------------------------------------------------------
 docs/index.md                                   |   1 -
 docs/storm-druid.md                             | 119 ---------------
 external/storm-druid/README.md                  | 147 -------------------
 external/storm-druid/pom.xml                    | 111 --------------
 .../apache/storm/druid/bolt/DruidBeamBolt.java  | 116 ---------------
 .../storm/druid/bolt/DruidBeamFactory.java      |  29 ----
 .../apache/storm/druid/bolt/DruidConfig.java    | 104 -------------
 .../druid/bolt/ITupleDruidEventMapper.java      |  38 -----
 .../storm/druid/bolt/TupleDruidEventMapper.java |  44 ------
 .../storm/druid/trident/DruidBeamState.java     |  96 ------------
 .../druid/trident/DruidBeamStateFactory.java    |  42 ------
 .../druid/trident/DruidBeamStateUpdater.java    |  48 ------
 .../storm/druid/SampleDruidBeamFactoryImpl.java | 122 ---------------
 .../storm/druid/SampleDruidBoltTopology.java    |  94 ------------
 .../druid/SampleDruidBoltTridentTopology.java   |  90 ------------
 .../apache/storm/druid/SimpleBatchSpout.java    |  95 ------------
 .../org/apache/storm/druid/SimpleSpout.java     |  68 ---------
 pom.xml                                         |   1 -
 18 files changed, 1365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index b1d881f..2697c47 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -107,7 +107,6 @@ But small change will not affect the user experience. We will notify the user wh
 * [Mongodb Integration](storm-mongodb.html)
 * [OpenTSDB Integration](storm-opentsdb.html)
 * [Kinesis Integration](storm-kinesis.html)
-* [Druid Integration](storm-druid.html)
 * [PMML Integration](storm-pmml.html)
 * [Kestrel Integration](Kestrel-and-Storm.html)
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/docs/storm-druid.md
----------------------------------------------------------------------
diff --git a/docs/storm-druid.md b/docs/storm-druid.md
deleted file mode 100644
index bed50dc..0000000
--- a/docs/storm-druid.md
+++ /dev/null
@@ -1,119 +0,0 @@
-# Storm Druid Bolt and TridentState
-
-This module provides core Storm and Trident bolt implementations for writing data to [Druid](http://druid.io/) data store.
-This implementation uses Druid's [Tranquility library](https://github.com/druid-io/tranquility) to send messages to druid.
-
-Some of the implementation details are borrowed from existing [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md).
-This new Bolt added to support latest storm release and maintain the bolt in the storm repo.
-
-### Core Bolt
-Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt`
-By default this Bolt expects to receive tuples in which "event" field gives your event type.
-This logic can be changed by implementing ITupleDruidEventMapper interface.
-
-```java
-
-   DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
-   DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
-   ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-   DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
-   topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
-   topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
-
-```
-
-
-### Trident State
-
-```java
-    DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
-    ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-
-    final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
-
-    stream.peek(new Consumer() {
-        @Override
-        public void accept(TridentTuple input) {
-             LOG.info("########### Received tuple: [{}]", input);
-         }
-    }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
-
-```
-
-### Sample Beam Factory Implementation
-Druid bolt must be supplied with a BeamFactory. You can implement one of these using the [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method.
-See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
-For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs.
-
-```java
-
-public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
-
-    @Override
-    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
-
-
-        final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
-        final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
-        final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
-        final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
-        List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
-                new CountAggregatorFactory(
-                        "click"
-                )
-        );
-        // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
-        final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
-        {
-            @Override
-            public DateTime timestamp(Map<String, Object> theMap)
-            {
-                return new DateTime(theMap.get("timestamp"));
-            }
-        };
-
-        // Tranquility uses ZooKeeper (through Curator) for coordination.
-        final CuratorFramework curator = CuratorFrameworkFactory
-                .builder()
-                .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
-                .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
-                .build();
-        curator.start();
-
-        // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
-        // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
-        final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
-
-        // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
-        // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
-        // In this case, we won't provide one, so we're just using Jackson.
-        final Beam<Map<String, Object>> beam = DruidBeams
-                .builder(timestamper)
-                .curator(curator)
-                .discoveryPath(discoveryPath)
-                .location(DruidLocation.create(indexService, dataSource))
-                .timestampSpec(timestampSpec)
-                .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
-                .tuning(
-                        ClusteredBeamTuning
-                                .builder()
-                                .segmentGranularity(Granularity.HOUR)
-                                .windowPeriod(new Period("PT10M"))
-                                .partitions(1)
-                                .replicants(1)
-                                .build()
-                )
-                .druidBeamConfig(
-                      DruidBeamConfig
-                           .builder()
-                           .indexRetryPeriod(new Period("PT10M"))
-                           .build())
-                .buildBeam();
-
-        return beam;
-    }
-}
-
-```
-
-Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/README.md
----------------------------------------------------------------------
diff --git a/external/storm-druid/README.md b/external/storm-druid/README.md
deleted file mode 100644
index de7f44c..0000000
--- a/external/storm-druid/README.md
+++ /dev/null
@@ -1,147 +0,0 @@
-# Storm Druid Bolt and TridentState
-
-This module provides core Storm and Trident bolt implementations for writing data to [Druid](http://druid.io/) data store.
-This implementation uses Druid's [Tranquility library](https://github.com/druid-io/tranquility) to send messages to druid.
-
-Some of the implementation details are borrowed from existing [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md).
-This new Bolt added to support latest storm release and maintain the bolt in the storm repo.
-
-### Core Bolt
-Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt`
-By default this Bolt expects to receive tuples in which "event" field gives your event type.
-This logic can be changed by implementing ITupleDruidEventMapper interface.
-
-```java
-
-   DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
-   DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
-   ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-   DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
-   topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
-   topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
-
-```
-
-
-### Trident State
-
-```java
-    DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
-    ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-
-    final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
-
-    stream.peek(new Consumer() {
-        @Override
-        public void accept(TridentTuple input) {
-             LOG.info("########### Received tuple: [{}]", input);
-         }
-    }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
-
-```
-
-### Sample Beam Factory Implementation
-Druid bolt must be supplied with a BeamFactory. You can implement one of these using the [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method.
-See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
-For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs.
-
-```java
-
-public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
-
-    @Override
-    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
-
-
-        final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
-        final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
-        final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
-        final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
-        List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
-                new CountAggregatorFactory(
-                        "click"
-                )
-        );
-        // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
-        final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
-        {
-            @Override
-            public DateTime timestamp(Map<String, Object> theMap)
-            {
-                return new DateTime(theMap.get("timestamp"));
-            }
-        };
-
-        // Tranquility uses ZooKeeper (through Curator) for coordination.
-        final CuratorFramework curator = CuratorFrameworkFactory
-                .builder()
-                .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
-                .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
-                .build();
-        curator.start();
-
-        // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
-        // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
-        final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
-
-        // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
-        // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
-        // In this case, we won't provide one, so we're just using Jackson.
-        final Beam<Map<String, Object>> beam = DruidBeams
-                .builder(timestamper)
-                .curator(curator)
-                .discoveryPath(discoveryPath)
-                .location(DruidLocation.create(indexService, dataSource))
-                .timestampSpec(timestampSpec)
-                .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
-                .tuning(
-                        ClusteredBeamTuning
-                                .builder()
-                                .segmentGranularity(Granularity.HOUR)
-                                .windowPeriod(new Period("PT10M"))
-                                .partitions(1)
-                                .replicants(1)
-                                .build()
-                )
-                .druidBeamConfig(
-                      DruidBeamConfig
-                           .builder()
-                           .indexRetryPeriod(new Period("PT10M"))
-                           .build())
-                .buildBeam();
-
-        return beam;
-    }
-}
-
-```
-
-Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)
-
-This version is built to work with Druid 0.8.x. This connector uses the Tranquility module built for Scala 2.11.
-This provides a shaded jar with all Tranquility dependencies. You should include Scala 2.11 dependency in your
-application.
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
- * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
- * Satish Duggana ([satishd@apache.org](mailto:satishd@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-druid/pom.xml b/external/storm-druid/pom.xml
deleted file mode 100644
index 831aa31..0000000
--- a/external/storm-druid/pom.xml
+++ /dev/null
@@ -1,111 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.0.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>storm-druid</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-            <scope>${provided.scope}</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>io.druid</groupId>
-            <artifactId>tranquility-core_2.11</artifactId>
-            <version>${druid.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-server</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-core</artifactId>
-                </exclusion>
-                <exclusion>
-                  <groupId>mysql</groupId>
-                  <artifactId>mysql-connector-java</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-library</artifactId>
-            <version>2.11.8</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.twitter</groupId>
-            <artifactId>util-core_2.11</artifactId>
-            <version>6.30.0</version>
-        </dependency>
-        <!-- tranquility library depends on jackson 2.4.6 version -->
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-            <version>2.4.6</version>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-            <version>2.4.6</version>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.dataformat</groupId>
-            <artifactId>jackson-dataformat-smile</artifactId>
-            <version>2.4.6</version>
-        </dependency>
-
-        <!--test dependencies -->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <!--Note - the version would be inherited-->
-                <configuration>
-                    <maxAllowedViolations>29</maxAllowedViolations>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
deleted file mode 100644
index 92b8a21..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.bolt;
-
-import com.metamx.tranquility.tranquilizer.MessageDroppedException;
-import com.metamx.tranquility.tranquilizer.Tranquilizer;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * Basic bolt implementation for storing data to Druid datastore.
- * <p/>
- * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
- * to send to druid store.
- * Some of the concepts are borrowed from Tranquility storm connector implementation.
- * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
- *
- * By default this Bolt expects to receive tuples in which "event" field gives your event type.
- * This logic can be changed by implementing ITupleDruidEventMapper interface.
- * <p/>
- *
- */
-public class DruidBeamBolt<E> extends BaseTickTupleAwareRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
-
-    private volatile  OutputCollector collector;
-    private DruidBeamFactory<E> beamFactory = null;
-    private DruidConfig druidConfig = null;
-    private Tranquilizer<E> tranquilizer = null;
-    private ITupleDruidEventMapper<E> druidEventMapper = null;
-
-    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig.Builder druidConfigBuilder) {
-        this.beamFactory = beamFactory;
-        this.druidConfig = druidConfigBuilder.build();
-        this.druidEventMapper = druidEventMapper;
-    }
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-        tranquilizer = Tranquilizer.builder()
-                .maxBatchSize(druidConfig.getMaxBatchSize())
-                .maxPendingBatches(druidConfig.getMaxPendingBatches())
-                .lingerMillis(druidConfig.getLingerMillis())
-                .blockOnFull(druidConfig.isBlockOnFull())
-                .build(beamFactory.makeBeam(topoConf, context));
-        this.tranquilizer.start();
-    }
-
-    @Override
-    protected void process(final Tuple tuple) {
-        final E mappedEvent = druidEventMapper.getEvent(tuple);
-        Future future = tranquilizer.send(mappedEvent);
-        LOG.debug("Sent tuple : [{}]", mappedEvent);
-
-        future.addEventListener(new FutureEventListener() {
-            @Override
-            public void onFailure(Throwable cause) {
-                if (cause instanceof MessageDroppedException) {
-                    collector.ack(tuple);
-                    LOG.debug("Tuple Dropped due to MessageDroppedException {} : [{}]", cause.getMessage(), mappedEvent);
-                    if (druidConfig.getDiscardStreamId() != null)
-                        collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis()));
-                } else {
-                    collector.fail(tuple);
-                    LOG.error("Tuple Processing Failed : [{}]", mappedEvent, cause);
-                }
-            }
-
-            @Override
-            public void onSuccess(Object value) {
-                collector.ack(tuple);
-                LOG.debug("Tuple Processing Success : [{}]", mappedEvent);
-            }
-        });
-
-    }
-
-    @Override
-    public void cleanup() {
-        tranquilizer.stop();
-    }
-
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declareStream(druidConfig.getDiscardStreamId(), new Fields("tuple", "timestamp"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
deleted file mode 100644
index 7d1866f..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.bolt;
-
-import com.metamx.tranquility.beam.Beam;
-import org.apache.storm.task.IMetricsContext;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public interface DruidBeamFactory<E>  extends  Serializable {
-    public Beam<E> makeBeam(Map<?,?> conf, IMetricsContext metrics);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
deleted file mode 100644
index 081d9ff..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.bolt;
-
-import com.metamx.tranquility.tranquilizer.Tranquilizer;
-
-import java.io.Serializable;
-
-public class DruidConfig implements Serializable {
-
-    public static final String DEFAULT_DISCARD_STREAM_ID = "druid-discard-stream";
-
-    //Tranquilizer configs for DruidBeamBolt
-    private int maxBatchSize;
-    private int maxPendingBatches;
-    private long lingerMillis;
-    private boolean blockOnFull;
-    private String discardStreamId;
-
-    public int getMaxBatchSize() {
-        return maxBatchSize;
-    }
-
-    public int getMaxPendingBatches() {
-        return maxPendingBatches;
-    }
-
-    public long getLingerMillis() {
-        return lingerMillis;
-    }
-
-    public boolean isBlockOnFull() {
-        return blockOnFull;
-    }
-
-    public String getDiscardStreamId() {
-        return discardStreamId;
-    }
-
-    private DruidConfig(Builder builder) {
-        this.maxBatchSize = builder.maxBatchSize;
-        this.maxPendingBatches = builder.maxPendingBatches;
-        this.lingerMillis = builder.lingerMillis;
-        this.blockOnFull = builder.blockOnFull;
-        this.discardStreamId = builder.discardStreamId;
-    }
-
-    public static DruidConfig.Builder newBuilder() {
-        return new Builder();
-    }
-
-    public static class Builder {
-        private int maxBatchSize = Tranquilizer.DefaultMaxBatchSize();
-        private int maxPendingBatches = Tranquilizer.DefaultMaxPendingBatches();
-        private long lingerMillis = Tranquilizer.DefaultLingerMillis();
-        private boolean blockOnFull =  Tranquilizer.DefaultBlockOnFull();
-        private String discardStreamId = null;
-
-        public Builder maxBatchSize(int maxBatchSize) {
-            this.maxBatchSize = maxBatchSize;
-            return this;
-        }
-
-        public Builder maxPendingBatches(int maxPendingBatches) {
-            this.maxPendingBatches = maxPendingBatches;
-            return this;
-        }
-
-        public Builder lingerMillis(int lingerMillis) {
-            this.lingerMillis = lingerMillis;
-            return this;
-        }
-
-        public Builder blockOnFull(boolean blockOnFull) {
-            this.blockOnFull = blockOnFull;
-            return this;
-        }
-
-        public Builder discardStreamId(String discardStreamId) {
-            this.discardStreamId = discardStreamId;
-            return this;
-        }
-
-        public DruidConfig build() {
-            return new DruidConfig(this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
deleted file mode 100644
index 0ae0233..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.bolt;
-
-import org.apache.storm.tuple.ITuple;
-
-import java.io.Serializable;
-
-/**
- * This class gives a mapping of a {@link ITuple} to Druid Event
- *
- */
-public interface ITupleDruidEventMapper<E> extends Serializable {
-
-    /**
-     * Returns a Druid Event for a given {@code tuple}.
-     *
-     * @param tuple tuple instance
-     */
-    public E getEvent(ITuple tuple);
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
deleted file mode 100644
index 67b7cc0..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.bolt;
-
-import org.apache.storm.tuple.ITuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Converts {@link ITuple} to Event
- */
-public final class TupleDruidEventMapper<E> implements ITupleDruidEventMapper<E> {
-
-    public static final String DEFAULT_FIELD_NAME = "event";
-
-    private final String eventFiledName;
-
-    public TupleDruidEventMapper(String eventFiledName) {
-        this.eventFiledName =  eventFiledName;
-    }
-
-    @Override
-    public E getEvent(ITuple tuple) {
-        return (E) tuple.getValueByField(eventFiledName);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
deleted file mode 100644
index e59fea9..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.trident;
-
-import com.metamx.tranquility.beam.Beam;
-import com.metamx.tranquility.beam.SendResult;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-
-/**
- * Trident {@link State} implementation for Druid.
- */
-public class DruidBeamState<E> implements State {
-    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
-
-    private Beam<E> beam = null;
-    private ITupleDruidEventMapper<E> druidEventMapper = null;
-
-    public DruidBeamState(Beam<E> beam, ITupleDruidEventMapper<E> druidEventMapper) {
-        this.beam = beam;
-        this.druidEventMapper = druidEventMapper;
-    }
-
-    public  List<E> update(List<TridentTuple> tuples, TridentCollector collector) {
-        List<E> events = new ArrayList<>(tuples.size());
-        for (TridentTuple tuple: tuples) {
-            events.add(druidEventMapper.getEvent(tuple));
-        }
-
-        LOG.info("Sending [{}] events", events.size());
-        scala.collection.immutable.List<E> scalaList = scala.collection.JavaConversions.collectionAsScalaIterable(events).toList();
-        Collection<Future<SendResult>> futureList = scala.collection.JavaConversions.asJavaCollection(beam.sendAll(scalaList));
-        List<E> discardedEvents = new ArrayList<>();
-
-        int index = 0;
-        for (Future<SendResult> future : futureList) {
-            try {
-                SendResult result = Await.result(future);
-                if (!result.sent()) {
-                    discardedEvents.add(events.get(index));
-                }
-            } catch (Exception e) {
-                LOG.error("Failed in writing messages to Druid", e);
-            }
-            index++;
-        }
-
-        return discardedEvents;
-
-    }
-
-    public void close() {
-        try {
-            Await.result(beam.close());
-        } catch (Exception e) {
-            LOG.error("Error while closing Druid beam client", e);
-        }
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-
-    }
-
-    @Override
-    public void commit(Long txid) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
deleted file mode 100644
index bb61005..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.trident;
-
-import org.apache.storm.druid.bolt.DruidBeamFactory;
-import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-import java.util.Map;
-
-public class DruidBeamStateFactory<E> implements StateFactory {
-    DruidBeamFactory beamFactory = null;
-    ITupleDruidEventMapper druidEventMapper = null;
-
-    public DruidBeamStateFactory(DruidBeamFactory<E> beamFactory,  ITupleDruidEventMapper<E> druidEventMapper) {
-        this.beamFactory = beamFactory;
-        this.druidEventMapper = druidEventMapper;
-    }
-
-    @Override
-    public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-        return new DruidBeamState(beamFactory.makeBeam(conf , metrics), druidEventMapper);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
deleted file mode 100644
index d8e2b78..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.trident;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-public class DruidBeamStateUpdater<E> extends BaseStateUpdater<DruidBeamState<E>> {
-    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamStateUpdater.class);
-
-    @Override
-    public void updateState(DruidBeamState<E> state, List<TridentTuple> tuples, TridentCollector collector) {
-        List<E> discardedTuples = state.update(tuples, collector);
-        processDiscardedTuples(discardedTuples);
-    }
-
-    /**
-     * Users can override this method to  process the discarded Tuples
-     * @param discardedTuples
-     */
-    protected void processDiscardedTuples(List<E> discardedTuples) {
-        LOG.debug("discarded messages : [{}]" , discardedTuples);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
deleted file mode 100644
index 179b4f4..0000000
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.druid;
-
-import com.google.common.collect.ImmutableList;
-import com.metamx.common.Granularity;
-import com.metamx.tranquility.beam.Beam;
-import com.metamx.tranquility.beam.ClusteredBeamTuning;
-import com.metamx.tranquility.druid.DruidBeamConfig;
-import com.metamx.tranquility.druid.DruidBeams;
-import com.metamx.tranquility.druid.DruidDimensions;
-import com.metamx.tranquility.druid.DruidLocation;
-import com.metamx.tranquility.druid.DruidRollup;
-import com.metamx.tranquility.typeclass.Timestamper;
-import io.druid.data.input.impl.TimestampSpec;
-import io.druid.granularity.QueryGranularities;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.storm.druid.bolt.DruidBeamFactory;
-import org.apache.storm.task.IMetricsContext;
-import org.joda.time.DateTime;
-import org.joda.time.Period;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Druid bolt must be supplied with a BeamFactory. You can implement one of these using the
- * [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala)
- * "buildBeam()" method. See the [Configuration documentation] (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
- * For more details refer [Tranquility library] (https://github.com/druid-io/tranquility) docs.
- */
-public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
-    Map<String, Object> factoryConf = null;
-
-
-    public SampleDruidBeamFactoryImpl(Map<String, Object> factoryConf) {
-        this.factoryConf = factoryConf; // This can be used to pass config values
-    }
-
-    @Override
-    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
-
-
-        final String indexService = "druid/overlord"; // Your overlord's druid.service
-        final String discoveryPath = "/druid/discovery"; // Your overlord's druid.discovery.curator.path
-        final String dataSource = "test";
-        final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
-        List<AggregatorFactory> aggregator = ImmutableList.<AggregatorFactory>of(
-                new CountAggregatorFactory(
-                        "click"
-                )
-        );
-        // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
-        final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
-        {
-            @Override
-            public DateTime timestamp(Map<String, Object> theMap)
-            {
-                return new DateTime(theMap.get("timestamp"));
-            }
-        };
-
-        // Tranquility uses ZooKeeper (through Curator) for coordination.
-        final CuratorFramework curator = CuratorFrameworkFactory
-                .builder()
-                .connectString((String)conf.get("druid.tranquility.zk.connect")) // we can use Storm conf to get config values
-                .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
-                .build();
-        curator.start();
-
-        // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
-        // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
-        final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
-
-        // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
-        // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
-        // In this case, we won't provide one, so we're just using Jackson.
-        final Beam<Map<String, Object>> beam = DruidBeams
-                .builder(timestamper)
-                .curator(curator)
-                .discoveryPath(discoveryPath)
-                .location(DruidLocation.create(indexService, dataSource))
-                .timestampSpec(timestampSpec)
-                .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularities.MINUTE))
-                .tuning(
-                        ClusteredBeamTuning
-                                .builder()
-                                .segmentGranularity(Granularity.HOUR)
-                                .windowPeriod(new Period("PT10M"))
-                                .partitions(1)
-                                .replicants(1)
-                                .build()
-                )
-                .druidBeamConfig(
-                        DruidBeamConfig
-                                .builder()
-                                .indexRetryPeriod(new Period("PT10M"))
-                                .build())
-                .buildBeam();
-
-        return beam;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
deleted file mode 100644
index 298353c..0000000
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.druid.bolt.DruidBeamBolt;
-import org.apache.storm.druid.bolt.DruidBeamFactory;
-import org.apache.storm.druid.bolt.DruidConfig;
-import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
-import org.apache.storm.druid.bolt.TupleDruidEventMapper;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Sample application to use Druid bolt.
- *
- * To test this we need to deploy Druid application. Refer Druid quickstart to run druid.
- * http://druid.io/docs/latest/tutorials/quickstart.html
- */
-public class SampleDruidBoltTopology {
-
-    public static void main(String[] args) throws Exception {
-        if(args.length == 0) {
-          throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTopology <zk-url>`");
-        }
-
-        TopologyBuilder topologyBuilder = new TopologyBuilder();
-
-        topologyBuilder.setSpout("event-gen", new SimpleSpout(), 5);
-        DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
-        DruidConfig.Builder builder = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID);
-        ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-        DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, builder);
-        topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
-        topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , DruidConfig.DEFAULT_DISCARD_STREAM_ID);
-
-        Config conf = new Config();
-        conf.setDebug(true);
-        conf.put("druid.tranquility.zk.connect", args[0]);
-
-        if (args.length > 1) {
-            conf.setNumWorkers(3);
-
-            StormSubmitter.submitTopologyWithProgressBar(args[1], conf, topologyBuilder.createTopology());
-        } else {
-            conf.setMaxTaskParallelism(3);
-
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("druid-test", conf, topologyBuilder.createTopology());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
-        }
-    }
-
-    private static class PrinterBolt extends BaseBasicBolt {
-
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            System.out.println(tuple);
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer ofd) {
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
deleted file mode 100644
index 3de18c1..0000000
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.druid.bolt.DruidBeamFactory;
-import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
-import org.apache.storm.druid.bolt.TupleDruidEventMapper;
-import org.apache.storm.druid.trident.DruidBeamStateFactory;
-import org.apache.storm.druid.trident.DruidBeamStateUpdater;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.Consumer;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Sample application to use Druid Trident bolt.
- *
- * To test this we need to deploy Druid application. Refer Druid quickstart to run druid.
- * http://druid.io/docs/latest/tutorials/quickstart.html
- */
-public class SampleDruidBoltTridentTopology {
-    private static final Logger LOG = LoggerFactory.getLogger(SampleDruidBoltTridentTopology.class);
-
-    public static void main(String[] args) throws Exception {
-        if(args.length == 0) {
-          throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTridentTopology <zk-url>`");
-        }
-
-        TridentTopology tridentTopology = new TridentTopology();
-        DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
-        ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-
-        final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
-
-        stream.peek(new Consumer() {
-            @Override
-            public void accept(TridentTuple input) {
-                LOG.info("########### Received tuple: [{}]", input);
-            }
-        }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
-
-        Config conf = new Config();
-
-
-
-        conf.setDebug(true);
-        conf.put("druid.tranquility.zk.connect", args[0]);
-
-        if (args.length > 1) {
-            conf.setNumWorkers(3);
-
-            StormSubmitter.submitTopologyWithProgressBar(args[1], conf, tridentTopology.build());
-        } else {
-            conf.setMaxTaskParallelism(3);
-
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("druid-test", conf, tridentTopology.build());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
deleted file mode 100644
index 2511ac8..0000000
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid;
-
-import org.apache.storm.Config;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.joda.time.DateTime;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * BatchSpout implementation for event batch  generation.
- */
-public class SimpleBatchSpout implements IBatchSpout {
-
-    private int batchSize;
-    private final Map<Long, List<List<Object>>> batches = new HashMap<>();
-
-    public SimpleBatchSpout(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context) {
-    }
-
-    @Override
-    public void emitBatch(long batchId, TridentCollector collector) {
-        List<List<Object>> values;
-        if(batches.containsKey(batchId)) {
-            values = batches.get(batchId);
-        } else {
-            values = new ArrayList<>();
-            for (int i = 0; i < batchSize; i++) {
-                List<Object> value = new ArrayList<>();
-                Map<String, Object> event = new LinkedHashMap<>();
-                event.put("timestamp", new DateTime().toString());
-                event.put("publisher", "foo.com");
-                event.put("advertiser", "google.com");
-                event.put("click", i);
-                value.add(event);
-                values.add(value);
-            }
-            batches.put(batchId, values);
-        }
-        for (List<Object> value : values) {
-            collector.emit(value);
-        }
-
-    }
-
-    @Override
-    public void ack(long batchId) {
-        batches.remove(batchId);
-    }
-
-    @Override
-    public void close() {
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Config conf = new Config();
-        conf.setMaxTaskParallelism(1);
-        return conf;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return SimpleSpout.DEFAULT_FIELDS;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
deleted file mode 100644
index d2010e9..0000000
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.druid;
-
-import org.apache.storm.druid.bolt.TupleDruidEventMapper;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.joda.time.DateTime;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class SimpleSpout extends BaseRichSpout {
-    SpoutOutputCollector _collector;
-    int i = 1;
-
-    public static final Fields DEFAULT_FIELDS = new Fields(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
-        _collector = collector;
-    }
-
-    @Override
-    public void nextTuple() {
-        Utils.sleep(1000);
-        Map<String, Object> event = new LinkedHashMap<>();
-        event.put("timestamp", new DateTime().toString());
-        event.put("publisher", "foo.com");
-        event.put("advertiser", "google.com");
-        event.put("click", i++);
-        _collector.emit(new Values(event));
-    }
-
-    @Override
-    public void ack(Object id) {
-    }
-
-    @Override
-    public void fail(Object id) {
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(DEFAULT_FIELDS);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 009d0f0..b5f3a5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -381,7 +381,6 @@
         <module>external/storm-opentsdb</module>
         <module>external/storm-kafka-monitor</module>
         <module>external/storm-kinesis</module>
-        <module>external/storm-druid</module>
         <module>external/storm-jms</module>
         <module>external/storm-pmml</module>
         <module>external/storm-rocketmq</module>


[2/2] storm git commit: Merge branch 'STORM-3097-merge'

Posted by ka...@apache.org.
Merge branch 'STORM-3097-merge'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0eb6b511
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0eb6b511
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0eb6b511

Branch: refs/heads/master
Commit: 0eb6b5116f251e17b6f14a61cebfadfc286faa59
Parents: 4aa6b5e b6cab8d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Jun 9 20:44:03 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jun 9 20:44:03 2018 +0900

----------------------------------------------------------------------
 docs/index.md                                   |   1 -
 docs/storm-druid.md                             | 119 ---------------
 external/storm-druid/README.md                  | 147 -------------------
 external/storm-druid/pom.xml                    | 111 --------------
 .../apache/storm/druid/bolt/DruidBeamBolt.java  | 116 ---------------
 .../storm/druid/bolt/DruidBeamFactory.java      |  29 ----
 .../apache/storm/druid/bolt/DruidConfig.java    | 104 -------------
 .../druid/bolt/ITupleDruidEventMapper.java      |  38 -----
 .../storm/druid/bolt/TupleDruidEventMapper.java |  44 ------
 .../storm/druid/trident/DruidBeamState.java     |  96 ------------
 .../druid/trident/DruidBeamStateFactory.java    |  42 ------
 .../druid/trident/DruidBeamStateUpdater.java    |  48 ------
 .../storm/druid/SampleDruidBeamFactoryImpl.java | 122 ---------------
 .../storm/druid/SampleDruidBoltTopology.java    |  94 ------------
 .../druid/SampleDruidBoltTridentTopology.java   |  90 ------------
 .../apache/storm/druid/SimpleBatchSpout.java    |  95 ------------
 .../org/apache/storm/druid/SimpleSpout.java     |  68 ---------
 pom.xml                                         |   1 -
 18 files changed, 1365 deletions(-)
----------------------------------------------------------------------