You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sa...@apache.org on 2016/08/09 05:16:56 UTC
[1/3] storm git commit: STORM-1979: Storm Druid Connector
implementation. This uses Druid's tranquility library.
Repository: storm
Updated Branches:
refs/heads/master 28563ece1 -> f5c55ac60
STORM-1979: Storm Druid Connector implementation. This uses Druid's tranquility library.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/422e0534
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/422e0534
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/422e0534
Branch: refs/heads/master
Commit: 422e0534f1a22b65f2ffcbf5e9a3884a619434f1
Parents: 44068c4
Author: Manikumar Reddy O <ma...@gmail.com>
Authored: Thu Jul 21 11:57:03 2016 +0000
Committer: Manikumar Reddy O <ma...@gmail.com>
Committed: Mon Aug 8 09:38:34 2016 +0530
----------------------------------------------------------------------
external/storm-druid/README.md | 143 +++++++++++++++++++
external/storm-druid/pom.xml | 83 +++++++++++
.../apache/storm/druid/bolt/DruidBeamBolt.java | 110 ++++++++++++++
.../storm/druid/bolt/DruidBeamFactory.java | 29 ++++
.../apache/storm/druid/bolt/DruidConfig.java | 104 ++++++++++++++
.../druid/bolt/ITupleDruidEventMapper.java | 38 +++++
.../storm/druid/bolt/TupleDruidEventMapper.java | 44 ++++++
.../storm/druid/trident/DruidBeamState.java | 96 +++++++++++++
.../druid/trident/DruidBeamStateFactory.java | 42 ++++++
.../druid/trident/DruidBeamStateUpdater.java | 48 +++++++
.../storm/druid/SampleDruidBeamFactoryImpl.java | 122 ++++++++++++++++
.../storm/druid/SampleDruidBoltTopology.java | 95 ++++++++++++
.../druid/SampleDruidBoltTridentTopology.java | 91 ++++++++++++
.../apache/storm/druid/SimpleBatchSpout.java | 95 ++++++++++++
.../org/apache/storm/druid/SimpleSpout.java | 68 +++++++++
pom.xml | 1 +
storm-dist/binary/src/main/assembly/binary.xml | 15 +-
17 files changed, 1223 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/README.md
----------------------------------------------------------------------
diff --git a/external/storm-druid/README.md b/external/storm-druid/README.md
new file mode 100644
index 0000000..75434f7
--- /dev/null
+++ b/external/storm-druid/README.md
@@ -0,0 +1,143 @@
+# Storm Druid Bolt and TridentState
+
+This module provides core Storm and Trident bolt implementations for writing data to [Druid](http://druid.io/) data store.
+This implementation uses Druid's [Tranquility library](https://github.com/druid-io/tranquility) to send messages to druid.
+
+Some of the implementation details are borrowed from existing [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md).
+This new Bolt added to support latest storm release and maintain the bolt in the storm repo.
+
+### Core Bolt
+Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt`
+By default this Bolt expects to receive tuples in which "event" field gives your event type.
+This logic can be changed by implementing ITupleDruidEventMapper interface.
+
+```java
+
+ DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
+ DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
+ ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+ DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
+ topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
+ topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
+
+```
+
+
+### Trident State
+
+```java
+ DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
+ ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+
+ final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
+
+ stream.peek(new Consumer() {
+ @Override
+ public void accept(TridentTuple input) {
+ LOG.info("########### Received tuple: [{}]", input);
+ }
+ }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
+
+```
+
+### Sample Beam Factory Implementation
+Druid bolt must be supplied with a BeamFactory. You can implement one of these using the [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method.
+See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
+For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs.
+
+```java
+
+public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
+
+ @Override
+ public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
+
+
+ final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
+ final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
+ final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
+ final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
+ List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
+ new CountAggregatorFactory(
+ "click"
+ )
+ );
+ // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
+ final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
+ {
+ @Override
+ public DateTime timestamp(Map<String, Object> theMap)
+ {
+ return new DateTime(theMap.get("timestamp"));
+ }
+ };
+
+ // Tranquility uses ZooKeeper (through Curator) for coordination.
+ final CuratorFramework curator = CuratorFrameworkFactory
+ .builder()
+ .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
+ .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
+ .build();
+ curator.start();
+
+ // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
+ // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
+ final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
+
+ // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
+ // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
+ // In this case, we won't provide one, so we're just using Jackson.
+ final Beam<Map<String, Object>> beam = DruidBeams
+ .builder(timestamper)
+ .curator(curator)
+ .discoveryPath(discoveryPath)
+ .location(DruidLocation.create(indexService, dataSource))
+ .timestampSpec(timestampSpec)
+ .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
+ .tuning(
+ ClusteredBeamTuning
+ .builder()
+ .segmentGranularity(Granularity.HOUR)
+ .windowPeriod(new Period("PT10M"))
+ .partitions(1)
+ .replicants(1)
+ .build()
+ )
+ .druidBeamConfig(
+ DruidBeamConfig
+ .builder()
+ .indexRetryPeriod(new Period("PT10M"))
+ .build())
+ .buildBeam();
+
+ return beam;
+ }
+}
+
+```
+
+Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+ * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+ * Satish Duggana ([satishd@apache.org](mailto:satishd@apache.org))
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-druid/pom.xml b/external/storm-druid/pom.xml
new file mode 100644
index 0000000..0f50d76
--- /dev/null
+++ b/external/storm-druid/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>storm-druid</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>tranquility-core_2.11</artifactId>
+ <version>0.8.2</version>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>druid-server</artifactId>
+ <version>0.9.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.11.8</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>util-core_2.11</artifactId>
+ <version>6.30.0</version>
+ </dependency>
+ <!-- tranquility library depends on jackson 2.4.6 version -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>2.4.6</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.4.6</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-smile</artifactId>
+ <version>2.4.6</version>
+ </dependency>
+
+ <!--test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
new file mode 100644
index 0000000..721eaa1
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import com.metamx.tranquility.tranquilizer.MessageDroppedException;
+import com.metamx.tranquility.tranquilizer.Tranquilizer;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Basic bolt implementation for storing data to Druid datastore.
+ * <p/>
+ * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
+ * to send to druid store.
+ * Some of the concepts are borrowed from Tranquility storm connector implementation.
+ * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
+ *
+ * By default this Bolt expects to receive tuples in which "event" field gives your event type.
+ * This logic can be changed by implementing ITupleDruidEventMapper interface.
+ * <p/>
+ *
+ */
+public class DruidBeamBolt<E> extends BaseRichBolt {
+
+ private volatile OutputCollector collector;
+ private DruidBeamFactory<E> beamFactory = null;
+ private DruidConfig druidConfig = null;
+ private Tranquilizer<E> tranquilizer = null;
+ private ITupleDruidEventMapper<E> druidEventMapper = null;
+
+ public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig druidConfig) {
+ this.beamFactory = beamFactory;
+ this.druidConfig = druidConfig;
+ this.druidEventMapper = druidEventMapper;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ tranquilizer = Tranquilizer.builder()
+ .maxBatchSize(druidConfig.getMaxBatchSize())
+ .maxPendingBatches(druidConfig.getMaxPendingBatches())
+ .lingerMillis(druidConfig.getLingerMillis())
+ .blockOnFull(druidConfig.isBlockOnFull())
+ .build(beamFactory.makeBeam(stormConf, context));
+ this.tranquilizer.start();
+ }
+
+ @Override
+ public void execute(final Tuple tuple) {
+ Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
+ future.addEventListener(new FutureEventListener() {
+ @Override
+ public void onFailure(Throwable cause) {
+ if (cause instanceof MessageDroppedException) {
+ collector.ack(tuple);
+ if (druidConfig.getDiscardStreamId() != null)
+ collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis()));
+ }
+ else {
+ collector.fail(tuple);
+ }
+ }
+
+ @Override
+ public void onSuccess(Object value) {
+ collector.ack(tuple);
+ }
+ });
+
+ }
+
+ @Override
+ public void cleanup() {
+ tranquilizer.stop();
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(druidConfig.getDiscardStreamId(), new Fields("tuple", "timestamp"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
new file mode 100644
index 0000000..7d1866f
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import com.metamx.tranquility.beam.Beam;
+import org.apache.storm.task.IMetricsContext;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public interface DruidBeamFactory<E> extends Serializable {
+ public Beam<E> makeBeam(Map<?,?> conf, IMetricsContext metrics);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
new file mode 100644
index 0000000..081d9ff
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import com.metamx.tranquility.tranquilizer.Tranquilizer;
+
+import java.io.Serializable;
+
+public class DruidConfig implements Serializable {
+
+ public static final String DEFAULT_DISCARD_STREAM_ID = "druid-discard-stream";
+
+ //Tranquilizer configs for DruidBeamBolt
+ private int maxBatchSize;
+ private int maxPendingBatches;
+ private long lingerMillis;
+ private boolean blockOnFull;
+ private String discardStreamId;
+
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ public int getMaxPendingBatches() {
+ return maxPendingBatches;
+ }
+
+ public long getLingerMillis() {
+ return lingerMillis;
+ }
+
+ public boolean isBlockOnFull() {
+ return blockOnFull;
+ }
+
+ public String getDiscardStreamId() {
+ return discardStreamId;
+ }
+
+ private DruidConfig(Builder builder) {
+ this.maxBatchSize = builder.maxBatchSize;
+ this.maxPendingBatches = builder.maxPendingBatches;
+ this.lingerMillis = builder.lingerMillis;
+ this.blockOnFull = builder.blockOnFull;
+ this.discardStreamId = builder.discardStreamId;
+ }
+
+ public static DruidConfig.Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private int maxBatchSize = Tranquilizer.DefaultMaxBatchSize();
+ private int maxPendingBatches = Tranquilizer.DefaultMaxPendingBatches();
+ private long lingerMillis = Tranquilizer.DefaultLingerMillis();
+ private boolean blockOnFull = Tranquilizer.DefaultBlockOnFull();
+ private String discardStreamId = null;
+
+ public Builder maxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return this;
+ }
+
+ public Builder maxPendingBatches(int maxPendingBatches) {
+ this.maxPendingBatches = maxPendingBatches;
+ return this;
+ }
+
+ public Builder lingerMillis(int lingerMillis) {
+ this.lingerMillis = lingerMillis;
+ return this;
+ }
+
+ public Builder blockOnFull(boolean blockOnFull) {
+ this.blockOnFull = blockOnFull;
+ return this;
+ }
+
+ public Builder discardStreamId(String discardStreamId) {
+ this.discardStreamId = discardStreamId;
+ return this;
+ }
+
+ public DruidConfig build() {
+ return new DruidConfig(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
new file mode 100644
index 0000000..0ae0233
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+/**
+ * This class gives a mapping of a {@link ITuple} to Druid Event
+ *
+ */
+public interface ITupleDruidEventMapper<E> extends Serializable {
+
+ /**
+ * Returns a Druid Event for a given {@code tuple}.
+ *
+ * @param tuple tuple instance
+ */
+ public E getEvent(ITuple tuple);
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
new file mode 100644
index 0000000..67b7cc0
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import org.apache.storm.tuple.ITuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Converts {@link ITuple} to Event
+ */
+public final class TupleDruidEventMapper<E> implements ITupleDruidEventMapper<E> {
+
+ public static final String DEFAULT_FIELD_NAME = "event";
+
+ private final String eventFiledName;
+
+ public TupleDruidEventMapper(String eventFiledName) {
+ this.eventFiledName = eventFiledName;
+ }
+
+ @Override
+ public E getEvent(ITuple tuple) {
+ return (E) tuple.getValueByField(eventFiledName);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
new file mode 100644
index 0000000..e59fea9
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.trident;
+
+import com.metamx.tranquility.beam.Beam;
+import com.metamx.tranquility.beam.SendResult;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * Trident {@link State} implementation for Druid.
+ */
+public class DruidBeamState<E> implements State {
+ private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
+
+ private Beam<E> beam = null;
+ private ITupleDruidEventMapper<E> druidEventMapper = null;
+
+ public DruidBeamState(Beam<E> beam, ITupleDruidEventMapper<E> druidEventMapper) {
+ this.beam = beam;
+ this.druidEventMapper = druidEventMapper;
+ }
+
+ public List<E> update(List<TridentTuple> tuples, TridentCollector collector) {
+ List<E> events = new ArrayList<>(tuples.size());
+ for (TridentTuple tuple: tuples) {
+ events.add(druidEventMapper.getEvent(tuple));
+ }
+
+ LOG.info("Sending [{}] events", events.size());
+ scala.collection.immutable.List<E> scalaList = scala.collection.JavaConversions.collectionAsScalaIterable(events).toList();
+ Collection<Future<SendResult>> futureList = scala.collection.JavaConversions.asJavaCollection(beam.sendAll(scalaList));
+ List<E> discardedEvents = new ArrayList<>();
+
+ int index = 0;
+ for (Future<SendResult> future : futureList) {
+ try {
+ SendResult result = Await.result(future);
+ if (!result.sent()) {
+ discardedEvents.add(events.get(index));
+ }
+ } catch (Exception e) {
+ LOG.error("Failed in writing messages to Druid", e);
+ }
+ index++;
+ }
+
+ return discardedEvents;
+
+ }
+
+ public void close() {
+ try {
+ Await.result(beam.close());
+ } catch (Exception e) {
+ LOG.error("Error while closing Druid beam client", e);
+ }
+ }
+
+ @Override
+ public void beginCommit(Long txid) {
+
+ }
+
+ @Override
+ public void commit(Long txid) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
new file mode 100644
index 0000000..b745cdd
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.trident;
+
+import org.apache.storm.druid.bolt.DruidBeamFactory;
+import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class DruidBeamStateFactory<E> implements StateFactory {
+ DruidBeamFactory beamFactory = null;
+ ITupleDruidEventMapper druidEventMapper = null;
+
+ public DruidBeamStateFactory(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper) {
+ this.beamFactory = beamFactory;
+ this.druidEventMapper = druidEventMapper;
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ return new DruidBeamState(beamFactory.makeBeam(conf , metrics), druidEventMapper);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
new file mode 100644
index 0000000..d8e2b78
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.trident;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class DruidBeamStateUpdater<E> extends BaseStateUpdater<DruidBeamState<E>> {
+ private static final Logger LOG = LoggerFactory.getLogger(DruidBeamStateUpdater.class);
+
+ @Override
+ public void updateState(DruidBeamState<E> state, List<TridentTuple> tuples, TridentCollector collector) {
+ List<E> discardedTuples = state.update(tuples, collector);
+ processDiscardedTuples(discardedTuples);
+ }
+
+ /**
+ * Users can override this method to process the discarded Tuples
+ * @param discardedTuples
+ */
+ protected void processDiscardedTuples(List<E> discardedTuples) {
+ LOG.debug("discarded messages : [{}]" , discardedTuples);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
new file mode 100644
index 0000000..5c1b4ed
--- /dev/null
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.druid;
+
+import com.google.common.collect.ImmutableList;
+import com.metamx.common.Granularity;
+import com.metamx.tranquility.beam.Beam;
+import com.metamx.tranquility.beam.ClusteredBeamTuning;
+import com.metamx.tranquility.druid.DruidBeamConfig;
+import com.metamx.tranquility.druid.DruidBeams;
+import com.metamx.tranquility.druid.DruidDimensions;
+import com.metamx.tranquility.druid.DruidLocation;
+import com.metamx.tranquility.druid.DruidRollup;
+import com.metamx.tranquility.typeclass.Timestamper;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.granularity.QueryGranularities;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.storm.druid.bolt.DruidBeamFactory;
+import org.apache.storm.task.IMetricsContext;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Druid bolt must be supplied with a BeamFactory. You can implement one of these using the
+ * [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala)
+ * "buildBeam()" method. See the [Configuration documentation] (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
+ * For more details refer [Tranquility library] (https://github.com/druid-io/tranquility) docs.
+ */
+public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
+ Map<String, Object> factoryConf = null;
+
+
+ public SampleDruidBeamFactoryImpl(Map<String, Object> factoryConf) {
+ this.factoryConf = factoryConf; // This can be used to pass config values
+ }
+
+ @Override
+ public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
+
+
+ final String indexService = "druid/overlord"; // Your overlord's druid.service
+ final String discoveryPath = "/druid/discovery"; // Your overlord's druid.discovery.curator.path
+ final String dataSource = "test";
+ final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
+ List<AggregatorFactory> aggregator = ImmutableList.<AggregatorFactory>of(
+ new CountAggregatorFactory(
+ "click"
+ )
+ );
+ // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
+ final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
+ {
+ @Override
+ public DateTime timestamp(Map<String, Object> theMap)
+ {
+ return new DateTime(theMap.get("timestamp"));
+ }
+ };
+
+ // Tranquility uses ZooKeeper (through Curator) for coordination.
+ final CuratorFramework curator = CuratorFrameworkFactory
+ .builder()
+ .connectString((String)conf.get("druid.tranquility.zk.connect")) // we can use Storm conf to get config values
+ .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
+ .build();
+ curator.start();
+
+ // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
+ // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
+ final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
+
+ // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
+ // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
+ // In this case, we won't provide one, so we're just using Jackson.
+ final Beam<Map<String, Object>> beam = DruidBeams
+ .builder(timestamper)
+ .curator(curator)
+ .discoveryPath(discoveryPath)
+ .location(DruidLocation.create(indexService, dataSource))
+ .timestampSpec(timestampSpec)
+ .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularities.MINUTE))
+ .tuning(
+ ClusteredBeamTuning
+ .builder()
+ .segmentGranularity(Granularity.HOUR)
+ .windowPeriod(new Period("PT10M"))
+ .partitions(1)
+ .replicants(1)
+ .build()
+ )
+ .druidBeamConfig(
+ DruidBeamConfig
+ .builder()
+ .indexRetryPeriod(new Period("PT10M"))
+ .build())
+ .buildBeam();
+
+ return beam;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
new file mode 100644
index 0000000..99a6f67
--- /dev/null
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.druid.bolt.DruidBeamBolt;
+import org.apache.storm.druid.bolt.DruidBeamFactory;
+import org.apache.storm.druid.bolt.DruidConfig;
+import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
+import org.apache.storm.druid.bolt.TupleDruidEventMapper;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Sample application to use Druid bolt.
+ *
+ * To test this we need to deploy Druid application. Refer Druid quickstart to run druid.
+ * http://druid.io/docs/latest/tutorials/quickstart.html
+ */
+public class SampleDruidBoltTopology {
+
+ public static void main(String[] args) throws Exception {
+ if(args.length == 0) {
+ throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTopology <zk-url>`");
+ }
+
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+ topologyBuilder.setSpout("event-gen", new SimpleSpout(), 5);
+ DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
+ DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
+ ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+ DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
+ topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
+ topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
+
+ Config conf = new Config();
+ conf.setDebug(true);
+ conf.put("druid.tranquility.zk.connect", args[0]);
+
+ if (args.length > 1) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopologyWithProgressBar(args[1], conf, topologyBuilder.createTopology());
+ } else {
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("druid-test", conf, topologyBuilder.createTopology());
+
+ Thread.sleep(30000);
+
+ cluster.shutdown();
+ System.exit(0);
+ }
+ }
+
+ private static class PrinterBolt extends BaseBasicBolt {
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ System.out.println(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer ofd) {
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
new file mode 100644
index 0000000..0e20ecd
--- /dev/null
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.druid.bolt.DruidBeamFactory;
+import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
+import org.apache.storm.druid.bolt.TupleDruidEventMapper;
+import org.apache.storm.druid.trident.DruidBeamStateFactory;
+import org.apache.storm.druid.trident.DruidBeamStateUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.Consumer;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Sample application to use Druid Trident bolt.
+ *
+ * To test this we need to deploy Druid application. Refer Druid quickstart to run druid.
+ * http://druid.io/docs/latest/tutorials/quickstart.html
+ */
+public class SampleDruidBoltTridentTopology {
+ private static final Logger LOG = LoggerFactory.getLogger(SampleDruidBoltTridentTopology.class);
+
+ public static void main(String[] args) throws Exception {
+ if(args.length == 0) {
+ throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTridentTopology <zk-url>`");
+ }
+
+ TridentTopology tridentTopology = new TridentTopology();
+ DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
+ ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+
+ final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
+
+ stream.peek(new Consumer() {
+ @Override
+ public void accept(TridentTuple input) {
+ LOG.info("########### Received tuple: [{}]", input);
+ }
+ }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
+
+ Config conf = new Config();
+
+
+
+ conf.setDebug(true);
+ conf.put("druid.tranquility.zk.connect", args[0]);
+
+ if (args.length > 1) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopologyWithProgressBar(args[1], conf, tridentTopology.build());
+ } else {
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("druid-test", conf, tridentTopology.build());
+
+ Thread.sleep(30000);
+
+ cluster.shutdown();
+ System.exit(0);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
new file mode 100644
index 0000000..cb30b7c
--- /dev/null
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.joda.time.DateTime;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BatchSpout implementation for event batch generation.
+ */
+public class SimpleBatchSpout implements IBatchSpout {
+
+ private int batchSize;
+ private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+ public SimpleBatchSpout(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ List<List<Object>> values;
+ if(batches.containsKey(batchId)) {
+ values = batches.get(batchId);
+ } else {
+ values = new ArrayList<>();
+ for (int i = 0; i < batchSize; i++) {
+ List<Object> value = new ArrayList<>();
+ Map<String, Object> event = new LinkedHashMap<>();
+ event.put("timestamp", new DateTime().toString());
+ event.put("publisher", "foo.com");
+ event.put("advertiser", "google.com");
+ event.put("click", i);
+ value.add(event);
+ values.add(value);
+ }
+ batches.put(batchId, values);
+ }
+ for (List<Object> value : values) {
+ collector.emit(value);
+ }
+
+ }
+
+ @Override
+ public void ack(long batchId) {
+ batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(1);
+ return conf;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return SimpleSpout.DEFAULT_FIELDS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
new file mode 100644
index 0000000..ec0f0bf
--- /dev/null
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.druid;
+
+import org.apache.storm.druid.bolt.TupleDruidEventMapper;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.joda.time.DateTime;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class SimpleSpout extends BaseRichSpout {
+ SpoutOutputCollector _collector;
+ int i = 1;
+
+ public static final Fields DEFAULT_FIELDS = new Fields(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(1000);
+ Map<String, Object> event = new LinkedHashMap<>();
+ event.put("timestamp", new DateTime().toString());
+ event.put("publisher", "foo.com");
+ event.put("advertiser", "google.com");
+ event.put("click", i++);
+ _collector.emit(new Values(event));
+ }
+
+ @Override
+ public void ack(Object id) {
+ }
+
+ @Override
+ public void fail(Object id) {
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(DEFAULT_FIELDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 763b696..14f45f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -311,6 +311,7 @@
<module>external/storm-opentsdb</module>
<module>external/storm-kafka-monitor</module>
<module>external/storm-kinesis</module>
+ <module>external/storm-druid</module>
</modules>
<dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index dd8cb49..12d15cc 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -382,7 +382,20 @@
<include>storm*jar</include>
</includes>
</fileSet>
-
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-druid/target</directory>
+ <outputDirectory>external/storm-druid</outputDirectory>
+ <includes>
+ <include>storm*jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-druid</directory>
+ <outputDirectory>external/storm-druid</outputDirectory>
+ <includes>
+ <include>README.*</include>
+ </includes>
+ </fileSet>
<!-- $STORM_HOME/extlib -->
<fileSet>
<directory></directory>
[3/3] storm git commit: Added STORM-1979 to CHANGELOG
Posted by sa...@apache.org.
Added STORM-1979 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f5c55ac6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f5c55ac6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f5c55ac6
Branch: refs/heads/master
Commit: f5c55ac608fbd05e08530cc744c0d1db7fe6374b
Parents: 52b67b4
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Tue Aug 9 10:37:05 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Aug 9 10:37:05 2016 +0530
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f5c55ac6/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4c8b35e..ff4f157 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-1979: Storm Druid Connector implementation.
* STORM-1277: port backtype.storm.daemon.executor to java
* STORM-2020: Stop using sun internal classes.
* STORM-2021: Fix license.
[2/3] storm git commit: Merge branch 'storm-druid' of
https://github.com/omkreddy/storm into STORM-1979
Posted by sa...@apache.org.
Merge branch 'storm-druid' of https://github.com/omkreddy/storm into STORM-1979
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/52b67b4a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/52b67b4a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/52b67b4a
Branch: refs/heads/master
Commit: 52b67b4ae677b17e0d255fd103d2d24cd58ff2c0
Parents: 28563ec 422e053
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Tue Aug 9 10:27:30 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Aug 9 10:27:30 2016 +0530
----------------------------------------------------------------------
external/storm-druid/README.md | 143 +++++++++++++++++++
external/storm-druid/pom.xml | 83 +++++++++++
.../apache/storm/druid/bolt/DruidBeamBolt.java | 110 ++++++++++++++
.../storm/druid/bolt/DruidBeamFactory.java | 29 ++++
.../apache/storm/druid/bolt/DruidConfig.java | 104 ++++++++++++++
.../druid/bolt/ITupleDruidEventMapper.java | 38 +++++
.../storm/druid/bolt/TupleDruidEventMapper.java | 44 ++++++
.../storm/druid/trident/DruidBeamState.java | 96 +++++++++++++
.../druid/trident/DruidBeamStateFactory.java | 42 ++++++
.../druid/trident/DruidBeamStateUpdater.java | 48 +++++++
.../storm/druid/SampleDruidBeamFactoryImpl.java | 122 ++++++++++++++++
.../storm/druid/SampleDruidBoltTopology.java | 95 ++++++++++++
.../druid/SampleDruidBoltTridentTopology.java | 91 ++++++++++++
.../apache/storm/druid/SimpleBatchSpout.java | 95 ++++++++++++
.../org/apache/storm/druid/SimpleSpout.java | 68 +++++++++
pom.xml | 1 +
storm-dist/binary/src/main/assembly/binary.xml | 15 +-
17 files changed, 1223 insertions(+), 1 deletion(-)
----------------------------------------------------------------------