You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/06/09 11:44:12 UTC
[1/2] storm git commit: STORM-3097: Remove storm-druid
Repository: storm
Updated Branches:
refs/heads/master 4aa6b5e18 -> 0eb6b5116
STORM-3097: Remove storm-druid
This closes #2707
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b6cab8dc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b6cab8dc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b6cab8dc
Branch: refs/heads/master
Commit: b6cab8dca97e2238aef8e45118f6946fc2d35049
Parents: 4aa6b5e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Jun 6 15:11:02 2018 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jun 9 20:43:33 2018 +0900
----------------------------------------------------------------------
docs/index.md | 1 -
docs/storm-druid.md | 119 ---------------
external/storm-druid/README.md | 147 -------------------
external/storm-druid/pom.xml | 111 --------------
.../apache/storm/druid/bolt/DruidBeamBolt.java | 116 ---------------
.../storm/druid/bolt/DruidBeamFactory.java | 29 ----
.../apache/storm/druid/bolt/DruidConfig.java | 104 -------------
.../druid/bolt/ITupleDruidEventMapper.java | 38 -----
.../storm/druid/bolt/TupleDruidEventMapper.java | 44 ------
.../storm/druid/trident/DruidBeamState.java | 96 ------------
.../druid/trident/DruidBeamStateFactory.java | 42 ------
.../druid/trident/DruidBeamStateUpdater.java | 48 ------
.../storm/druid/SampleDruidBeamFactoryImpl.java | 122 ---------------
.../storm/druid/SampleDruidBoltTopology.java | 94 ------------
.../druid/SampleDruidBoltTridentTopology.java | 90 ------------
.../apache/storm/druid/SimpleBatchSpout.java | 95 ------------
.../org/apache/storm/druid/SimpleSpout.java | 68 ---------
pom.xml | 1 -
18 files changed, 1365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index b1d881f..2697c47 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -107,7 +107,6 @@ But small change will not affect the user experience. We will notify the user wh
* [Mongodb Integration](storm-mongodb.html)
* [OpenTSDB Integration](storm-opentsdb.html)
* [Kinesis Integration](storm-kinesis.html)
-* [Druid Integration](storm-druid.html)
* [PMML Integration](storm-pmml.html)
* [Kestrel Integration](Kestrel-and-Storm.html)
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/docs/storm-druid.md
----------------------------------------------------------------------
diff --git a/docs/storm-druid.md b/docs/storm-druid.md
deleted file mode 100644
index bed50dc..0000000
--- a/docs/storm-druid.md
+++ /dev/null
@@ -1,119 +0,0 @@
-# Storm Druid Bolt and TridentState
-
-This module provides core Storm and Trident bolt implementations for writing data to [Druid](http://druid.io/) data store.
-This implementation uses Druid's [Tranquility library](https://github.com/druid-io/tranquility) to send messages to druid.
-
-Some of the implementation details are borrowed from existing [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md).
-This new Bolt added to support latest storm release and maintain the bolt in the storm repo.
-
-### Core Bolt
-Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt`
-By default this Bolt expects to receive tuples in which "event" field gives your event type.
-This logic can be changed by implementing ITupleDruidEventMapper interface.
-
-```java
-
- DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
- DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
- ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
- DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
- topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
- topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
-
-```
-
-
-### Trident State
-
-```java
- DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
- ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-
- final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
-
- stream.peek(new Consumer() {
- @Override
- public void accept(TridentTuple input) {
- LOG.info("########### Received tuple: [{}]", input);
- }
- }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
-
-```
-
-### Sample Beam Factory Implementation
-Druid bolt must be supplied with a BeamFactory. You can implement one of these using the [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method.
-See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
-For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs.
-
-```java
-
-public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
-
- @Override
- public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
-
-
- final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
- final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
- final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
- final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
- List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
- new CountAggregatorFactory(
- "click"
- )
- );
- // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
- final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
- {
- @Override
- public DateTime timestamp(Map<String, Object> theMap)
- {
- return new DateTime(theMap.get("timestamp"));
- }
- };
-
- // Tranquility uses ZooKeeper (through Curator) for coordination.
- final CuratorFramework curator = CuratorFrameworkFactory
- .builder()
- .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
- .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
- .build();
- curator.start();
-
- // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
- // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
- final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
-
- // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
- // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
- // In this case, we won't provide one, so we're just using Jackson.
- final Beam<Map<String, Object>> beam = DruidBeams
- .builder(timestamper)
- .curator(curator)
- .discoveryPath(discoveryPath)
- .location(DruidLocation.create(indexService, dataSource))
- .timestampSpec(timestampSpec)
- .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
- .tuning(
- ClusteredBeamTuning
- .builder()
- .segmentGranularity(Granularity.HOUR)
- .windowPeriod(new Period("PT10M"))
- .partitions(1)
- .replicants(1)
- .build()
- )
- .druidBeamConfig(
- DruidBeamConfig
- .builder()
- .indexRetryPeriod(new Period("PT10M"))
- .build())
- .buildBeam();
-
- return beam;
- }
-}
-
-```
-
-Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/README.md
----------------------------------------------------------------------
diff --git a/external/storm-druid/README.md b/external/storm-druid/README.md
deleted file mode 100644
index de7f44c..0000000
--- a/external/storm-druid/README.md
+++ /dev/null
@@ -1,147 +0,0 @@
-# Storm Druid Bolt and TridentState
-
-This module provides core Storm and Trident bolt implementations for writing data to [Druid](http://druid.io/) data store.
-This implementation uses Druid's [Tranquility library](https://github.com/druid-io/tranquility) to send messages to druid.
-
-Some of the implementation details are borrowed from existing [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md).
-This new Bolt added to support latest storm release and maintain the bolt in the storm repo.
-
-### Core Bolt
-Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt`
-By default this Bolt expects to receive tuples in which "event" field gives your event type.
-This logic can be changed by implementing ITupleDruidEventMapper interface.
-
-```java
-
- DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
- DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
- ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
- DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
- topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
- topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
-
-```
-
-
-### Trident State
-
-```java
- DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
- ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-
- final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
-
- stream.peek(new Consumer() {
- @Override
- public void accept(TridentTuple input) {
- LOG.info("########### Received tuple: [{}]", input);
- }
- }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
-
-```
-
-### Sample Beam Factory Implementation
-Druid bolt must be supplied with a BeamFactory. You can implement one of these using the [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method.
-See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
-For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs.
-
-```java
-
-public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
-
- @Override
- public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
-
-
- final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
- final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
- final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
- final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
- List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
- new CountAggregatorFactory(
- "click"
- )
- );
- // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
- final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
- {
- @Override
- public DateTime timestamp(Map<String, Object> theMap)
- {
- return new DateTime(theMap.get("timestamp"));
- }
- };
-
- // Tranquility uses ZooKeeper (through Curator) for coordination.
- final CuratorFramework curator = CuratorFrameworkFactory
- .builder()
- .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
- .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
- .build();
- curator.start();
-
- // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
- // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
- final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
-
- // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
- // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
- // In this case, we won't provide one, so we're just using Jackson.
- final Beam<Map<String, Object>> beam = DruidBeams
- .builder(timestamper)
- .curator(curator)
- .discoveryPath(discoveryPath)
- .location(DruidLocation.create(indexService, dataSource))
- .timestampSpec(timestampSpec)
- .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
- .tuning(
- ClusteredBeamTuning
- .builder()
- .segmentGranularity(Granularity.HOUR)
- .windowPeriod(new Period("PT10M"))
- .partitions(1)
- .replicants(1)
- .build()
- )
- .druidBeamConfig(
- DruidBeamConfig
- .builder()
- .indexRetryPeriod(new Period("PT10M"))
- .build())
- .buildBeam();
-
- return beam;
- }
-}
-
-```
-
-Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)
-
-This version is built to work with Druid 0.8.x. This connector uses the Tranquility module built for Scala 2.11.
-This provides a shaded jar with all Tranquility dependencies. You should include Scala 2.11 dependency in your
-application.
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
- * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
- * Satish Duggana ([satishd@apache.org](mailto:satishd@apache.org))
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-druid/pom.xml b/external/storm-druid/pom.xml
deleted file mode 100644
index 831aa31..0000000
--- a/external/storm-druid/pom.xml
+++ /dev/null
@@ -1,111 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>storm-druid</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-client</artifactId>
- <version>${project.version}</version>
- <scope>${provided.scope}</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.druid</groupId>
- <artifactId>tranquility-core_2.11</artifactId>
- <version>${druid.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.11.8</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>util-core_2.11</artifactId>
- <version>6.30.0</version>
- </dependency>
- <!-- tranquility library depends on jackson 2.4.6 version -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>2.4.6</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.4.6</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-smile</artifactId>
- <version>2.4.6</version>
- </dependency>
-
- <!--test dependencies -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <!--Note - the version would be inherited-->
- <configuration>
- <maxAllowedViolations>29</maxAllowedViolations>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
deleted file mode 100644
index 92b8a21..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.bolt;
-
-import com.metamx.tranquility.tranquilizer.MessageDroppedException;
-import com.metamx.tranquility.tranquilizer.Tranquilizer;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * Basic bolt implementation for storing data to Druid datastore.
- * <p/>
- * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
- * to send to druid store.
- * Some of the concepts are borrowed from Tranquility storm connector implementation.
- * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
- *
- * By default this Bolt expects to receive tuples in which "event" field gives your event type.
- * This logic can be changed by implementing ITupleDruidEventMapper interface.
- * <p/>
- *
- */
-public class DruidBeamBolt<E> extends BaseTickTupleAwareRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
-
- private volatile OutputCollector collector;
- private DruidBeamFactory<E> beamFactory = null;
- private DruidConfig druidConfig = null;
- private Tranquilizer<E> tranquilizer = null;
- private ITupleDruidEventMapper<E> druidEventMapper = null;
-
- public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig.Builder druidConfigBuilder) {
- this.beamFactory = beamFactory;
- this.druidConfig = druidConfigBuilder.build();
- this.druidEventMapper = druidEventMapper;
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- tranquilizer = Tranquilizer.builder()
- .maxBatchSize(druidConfig.getMaxBatchSize())
- .maxPendingBatches(druidConfig.getMaxPendingBatches())
- .lingerMillis(druidConfig.getLingerMillis())
- .blockOnFull(druidConfig.isBlockOnFull())
- .build(beamFactory.makeBeam(topoConf, context));
- this.tranquilizer.start();
- }
-
- @Override
- protected void process(final Tuple tuple) {
- final E mappedEvent = druidEventMapper.getEvent(tuple);
- Future future = tranquilizer.send(mappedEvent);
- LOG.debug("Sent tuple : [{}]", mappedEvent);
-
- future.addEventListener(new FutureEventListener() {
- @Override
- public void onFailure(Throwable cause) {
- if (cause instanceof MessageDroppedException) {
- collector.ack(tuple);
- LOG.debug("Tuple Dropped due to MessageDroppedException {} : [{}]", cause.getMessage(), mappedEvent);
- if (druidConfig.getDiscardStreamId() != null)
- collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis()));
- } else {
- collector.fail(tuple);
- LOG.error("Tuple Processing Failed : [{}]", mappedEvent, cause);
- }
- }
-
- @Override
- public void onSuccess(Object value) {
- collector.ack(tuple);
- LOG.debug("Tuple Processing Success : [{}]", mappedEvent);
- }
- });
-
- }
-
- @Override
- public void cleanup() {
- tranquilizer.stop();
- }
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream(druidConfig.getDiscardStreamId(), new Fields("tuple", "timestamp"));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
deleted file mode 100644
index 7d1866f..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.bolt;
-
-import com.metamx.tranquility.beam.Beam;
-import org.apache.storm.task.IMetricsContext;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public interface DruidBeamFactory<E> extends Serializable {
- public Beam<E> makeBeam(Map<?,?> conf, IMetricsContext metrics);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
deleted file mode 100644
index 081d9ff..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.bolt;
-
-import com.metamx.tranquility.tranquilizer.Tranquilizer;
-
-import java.io.Serializable;
-
-public class DruidConfig implements Serializable {
-
- public static final String DEFAULT_DISCARD_STREAM_ID = "druid-discard-stream";
-
- //Tranquilizer configs for DruidBeamBolt
- private int maxBatchSize;
- private int maxPendingBatches;
- private long lingerMillis;
- private boolean blockOnFull;
- private String discardStreamId;
-
- public int getMaxBatchSize() {
- return maxBatchSize;
- }
-
- public int getMaxPendingBatches() {
- return maxPendingBatches;
- }
-
- public long getLingerMillis() {
- return lingerMillis;
- }
-
- public boolean isBlockOnFull() {
- return blockOnFull;
- }
-
- public String getDiscardStreamId() {
- return discardStreamId;
- }
-
- private DruidConfig(Builder builder) {
- this.maxBatchSize = builder.maxBatchSize;
- this.maxPendingBatches = builder.maxPendingBatches;
- this.lingerMillis = builder.lingerMillis;
- this.blockOnFull = builder.blockOnFull;
- this.discardStreamId = builder.discardStreamId;
- }
-
- public static DruidConfig.Builder newBuilder() {
- return new Builder();
- }
-
- public static class Builder {
- private int maxBatchSize = Tranquilizer.DefaultMaxBatchSize();
- private int maxPendingBatches = Tranquilizer.DefaultMaxPendingBatches();
- private long lingerMillis = Tranquilizer.DefaultLingerMillis();
- private boolean blockOnFull = Tranquilizer.DefaultBlockOnFull();
- private String discardStreamId = null;
-
- public Builder maxBatchSize(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
- return this;
- }
-
- public Builder maxPendingBatches(int maxPendingBatches) {
- this.maxPendingBatches = maxPendingBatches;
- return this;
- }
-
- public Builder lingerMillis(int lingerMillis) {
- this.lingerMillis = lingerMillis;
- return this;
- }
-
- public Builder blockOnFull(boolean blockOnFull) {
- this.blockOnFull = blockOnFull;
- return this;
- }
-
- public Builder discardStreamId(String discardStreamId) {
- this.discardStreamId = discardStreamId;
- return this;
- }
-
- public DruidConfig build() {
- return new DruidConfig(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
deleted file mode 100644
index 0ae0233..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.bolt;
-
-import org.apache.storm.tuple.ITuple;
-
-import java.io.Serializable;
-
-/**
- * This class gives a mapping of a {@link ITuple} to Druid Event
- *
- */
-public interface ITupleDruidEventMapper<E> extends Serializable {
-
- /**
- * Returns a Druid Event for a given {@code tuple}.
- *
- * @param tuple tuple instance
- */
- public E getEvent(ITuple tuple);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
deleted file mode 100644
index 67b7cc0..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.bolt;
-
-import org.apache.storm.tuple.ITuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Converts {@link ITuple} to Event
- */
-public final class TupleDruidEventMapper<E> implements ITupleDruidEventMapper<E> {
-
- public static final String DEFAULT_FIELD_NAME = "event";
-
- private final String eventFiledName;
-
- public TupleDruidEventMapper(String eventFiledName) {
- this.eventFiledName = eventFiledName;
- }
-
- @Override
- public E getEvent(ITuple tuple) {
- return (E) tuple.getValueByField(eventFiledName);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
deleted file mode 100644
index e59fea9..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.trident;
-
-import com.metamx.tranquility.beam.Beam;
-import com.metamx.tranquility.beam.SendResult;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-
-/**
- * Trident {@link State} implementation for Druid.
- */
-public class DruidBeamState<E> implements State {
- private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
-
- private Beam<E> beam = null;
- private ITupleDruidEventMapper<E> druidEventMapper = null;
-
- public DruidBeamState(Beam<E> beam, ITupleDruidEventMapper<E> druidEventMapper) {
- this.beam = beam;
- this.druidEventMapper = druidEventMapper;
- }
-
- public List<E> update(List<TridentTuple> tuples, TridentCollector collector) {
- List<E> events = new ArrayList<>(tuples.size());
- for (TridentTuple tuple: tuples) {
- events.add(druidEventMapper.getEvent(tuple));
- }
-
- LOG.info("Sending [{}] events", events.size());
- scala.collection.immutable.List<E> scalaList = scala.collection.JavaConversions.collectionAsScalaIterable(events).toList();
- Collection<Future<SendResult>> futureList = scala.collection.JavaConversions.asJavaCollection(beam.sendAll(scalaList));
- List<E> discardedEvents = new ArrayList<>();
-
- int index = 0;
- for (Future<SendResult> future : futureList) {
- try {
- SendResult result = Await.result(future);
- if (!result.sent()) {
- discardedEvents.add(events.get(index));
- }
- } catch (Exception e) {
- LOG.error("Failed in writing messages to Druid", e);
- }
- index++;
- }
-
- return discardedEvents;
-
- }
-
- public void close() {
- try {
- Await.result(beam.close());
- } catch (Exception e) {
- LOG.error("Error while closing Druid beam client", e);
- }
- }
-
- @Override
- public void beginCommit(Long txid) {
-
- }
-
- @Override
- public void commit(Long txid) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
deleted file mode 100644
index bb61005..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.trident;
-
-import org.apache.storm.druid.bolt.DruidBeamFactory;
-import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-import java.util.Map;
-
-public class DruidBeamStateFactory<E> implements StateFactory {
- DruidBeamFactory beamFactory = null;
- ITupleDruidEventMapper druidEventMapper = null;
-
- public DruidBeamStateFactory(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper) {
- this.beamFactory = beamFactory;
- this.druidEventMapper = druidEventMapper;
- }
-
- @Override
- public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- return new DruidBeamState(beamFactory.makeBeam(conf , metrics), druidEventMapper);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
deleted file mode 100644
index d8e2b78..0000000
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid.trident;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-public class DruidBeamStateUpdater<E> extends BaseStateUpdater<DruidBeamState<E>> {
- private static final Logger LOG = LoggerFactory.getLogger(DruidBeamStateUpdater.class);
-
- @Override
- public void updateState(DruidBeamState<E> state, List<TridentTuple> tuples, TridentCollector collector) {
- List<E> discardedTuples = state.update(tuples, collector);
- processDiscardedTuples(discardedTuples);
- }
-
- /**
- * Users can override this method to process the discarded Tuples
- * @param discardedTuples
- */
- protected void processDiscardedTuples(List<E> discardedTuples) {
- LOG.debug("discarded messages : [{}]" , discardedTuples);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
deleted file mode 100644
index 179b4f4..0000000
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.druid;
-
-import com.google.common.collect.ImmutableList;
-import com.metamx.common.Granularity;
-import com.metamx.tranquility.beam.Beam;
-import com.metamx.tranquility.beam.ClusteredBeamTuning;
-import com.metamx.tranquility.druid.DruidBeamConfig;
-import com.metamx.tranquility.druid.DruidBeams;
-import com.metamx.tranquility.druid.DruidDimensions;
-import com.metamx.tranquility.druid.DruidLocation;
-import com.metamx.tranquility.druid.DruidRollup;
-import com.metamx.tranquility.typeclass.Timestamper;
-import io.druid.data.input.impl.TimestampSpec;
-import io.druid.granularity.QueryGranularities;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.storm.druid.bolt.DruidBeamFactory;
-import org.apache.storm.task.IMetricsContext;
-import org.joda.time.DateTime;
-import org.joda.time.Period;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Druid bolt must be supplied with a BeamFactory. You can implement one of these using the
- * [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala)
- * "buildBeam()" method. See the [Configuration documentation] (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
- * For more details refer [Tranquility library] (https://github.com/druid-io/tranquility) docs.
- */
-public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
- Map<String, Object> factoryConf = null;
-
-
- public SampleDruidBeamFactoryImpl(Map<String, Object> factoryConf) {
- this.factoryConf = factoryConf; // This can be used to pass config values
- }
-
- @Override
- public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
-
-
- final String indexService = "druid/overlord"; // Your overlord's druid.service
- final String discoveryPath = "/druid/discovery"; // Your overlord's druid.discovery.curator.path
- final String dataSource = "test";
- final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
- List<AggregatorFactory> aggregator = ImmutableList.<AggregatorFactory>of(
- new CountAggregatorFactory(
- "click"
- )
- );
- // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
- final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
- {
- @Override
- public DateTime timestamp(Map<String, Object> theMap)
- {
- return new DateTime(theMap.get("timestamp"));
- }
- };
-
- // Tranquility uses ZooKeeper (through Curator) for coordination.
- final CuratorFramework curator = CuratorFrameworkFactory
- .builder()
- .connectString((String)conf.get("druid.tranquility.zk.connect")) // we can use Storm conf to get config values
- .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
- .build();
- curator.start();
-
- // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
- // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
- final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
-
- // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
- // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
- // In this case, we won't provide one, so we're just using Jackson.
- final Beam<Map<String, Object>> beam = DruidBeams
- .builder(timestamper)
- .curator(curator)
- .discoveryPath(discoveryPath)
- .location(DruidLocation.create(indexService, dataSource))
- .timestampSpec(timestampSpec)
- .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularities.MINUTE))
- .tuning(
- ClusteredBeamTuning
- .builder()
- .segmentGranularity(Granularity.HOUR)
- .windowPeriod(new Period("PT10M"))
- .partitions(1)
- .replicants(1)
- .build()
- )
- .druidBeamConfig(
- DruidBeamConfig
- .builder()
- .indexRetryPeriod(new Period("PT10M"))
- .build())
- .buildBeam();
-
- return beam;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
deleted file mode 100644
index 298353c..0000000
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.druid.bolt.DruidBeamBolt;
-import org.apache.storm.druid.bolt.DruidBeamFactory;
-import org.apache.storm.druid.bolt.DruidConfig;
-import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
-import org.apache.storm.druid.bolt.TupleDruidEventMapper;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Sample application to use Druid bolt.
- *
- * To test this we need to deploy Druid application. Refer Druid quickstart to run druid.
- * http://druid.io/docs/latest/tutorials/quickstart.html
- */
-public class SampleDruidBoltTopology {
-
- public static void main(String[] args) throws Exception {
- if(args.length == 0) {
- throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTopology <zk-url>`");
- }
-
- TopologyBuilder topologyBuilder = new TopologyBuilder();
-
- topologyBuilder.setSpout("event-gen", new SimpleSpout(), 5);
- DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
- DruidConfig.Builder builder = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID);
- ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
- DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, builder);
- topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
- topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , DruidConfig.DEFAULT_DISCARD_STREAM_ID);
-
- Config conf = new Config();
- conf.setDebug(true);
- conf.put("druid.tranquility.zk.connect", args[0]);
-
- if (args.length > 1) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopologyWithProgressBar(args[1], conf, topologyBuilder.createTopology());
- } else {
- conf.setMaxTaskParallelism(3);
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("druid-test", conf, topologyBuilder.createTopology());) {
- Thread.sleep(30000);
- }
- System.exit(0);
- }
- }
-
- private static class PrinterBolt extends BaseBasicBolt {
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- System.out.println(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer ofd) {
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
deleted file mode 100644
index 3de18c1..0000000
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.druid.bolt.DruidBeamFactory;
-import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
-import org.apache.storm.druid.bolt.TupleDruidEventMapper;
-import org.apache.storm.druid.trident.DruidBeamStateFactory;
-import org.apache.storm.druid.trident.DruidBeamStateUpdater;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.Consumer;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Sample application to use Druid Trident bolt.
- *
- * To test this we need to deploy Druid application. Refer Druid quickstart to run druid.
- * http://druid.io/docs/latest/tutorials/quickstart.html
- */
-public class SampleDruidBoltTridentTopology {
- private static final Logger LOG = LoggerFactory.getLogger(SampleDruidBoltTridentTopology.class);
-
- public static void main(String[] args) throws Exception {
- if(args.length == 0) {
- throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTridentTopology <zk-url>`");
- }
-
- TridentTopology tridentTopology = new TridentTopology();
- DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
- ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-
- final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
-
- stream.peek(new Consumer() {
- @Override
- public void accept(TridentTuple input) {
- LOG.info("########### Received tuple: [{}]", input);
- }
- }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
-
- Config conf = new Config();
-
-
-
- conf.setDebug(true);
- conf.put("druid.tranquility.zk.connect", args[0]);
-
- if (args.length > 1) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopologyWithProgressBar(args[1], conf, tridentTopology.build());
- } else {
- conf.setMaxTaskParallelism(3);
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("druid-test", conf, tridentTopology.build());) {
- Thread.sleep(30000);
- }
- System.exit(0);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
deleted file mode 100644
index 2511ac8..0000000
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.druid;
-
-import org.apache.storm.Config;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.joda.time.DateTime;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * BatchSpout implementation for event batch generation.
- */
-public class SimpleBatchSpout implements IBatchSpout {
-
- private int batchSize;
- private final Map<Long, List<List<Object>>> batches = new HashMap<>();
-
- public SimpleBatchSpout(int batchSize) {
- this.batchSize = batchSize;
- }
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context) {
- }
-
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- List<List<Object>> values;
- if(batches.containsKey(batchId)) {
- values = batches.get(batchId);
- } else {
- values = new ArrayList<>();
- for (int i = 0; i < batchSize; i++) {
- List<Object> value = new ArrayList<>();
- Map<String, Object> event = new LinkedHashMap<>();
- event.put("timestamp", new DateTime().toString());
- event.put("publisher", "foo.com");
- event.put("advertiser", "google.com");
- event.put("click", i);
- value.add(event);
- values.add(value);
- }
- batches.put(batchId, values);
- }
- for (List<Object> value : values) {
- collector.emit(value);
- }
-
- }
-
- @Override
- public void ack(long batchId) {
- batches.remove(batchId);
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Config conf = new Config();
- conf.setMaxTaskParallelism(1);
- return conf;
- }
-
- @Override
- public Fields getOutputFields() {
- return SimpleSpout.DEFAULT_FIELDS;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
deleted file mode 100644
index d2010e9..0000000
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.druid;
-
-import org.apache.storm.druid.bolt.TupleDruidEventMapper;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.joda.time.DateTime;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class SimpleSpout extends BaseRichSpout {
- SpoutOutputCollector _collector;
- int i = 1;
-
- public static final Fields DEFAULT_FIELDS = new Fields(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void nextTuple() {
- Utils.sleep(1000);
- Map<String, Object> event = new LinkedHashMap<>();
- event.put("timestamp", new DateTime().toString());
- event.put("publisher", "foo.com");
- event.put("advertiser", "google.com");
- event.put("click", i++);
- _collector.emit(new Values(event));
- }
-
- @Override
- public void ack(Object id) {
- }
-
- @Override
- public void fail(Object id) {
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(DEFAULT_FIELDS);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b6cab8dc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 009d0f0..b5f3a5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -381,7 +381,6 @@
<module>external/storm-opentsdb</module>
<module>external/storm-kafka-monitor</module>
<module>external/storm-kinesis</module>
- <module>external/storm-druid</module>
<module>external/storm-jms</module>
<module>external/storm-pmml</module>
<module>external/storm-rocketmq</module>
[2/2] storm git commit: Merge branch 'STORM-3097-merge'
Posted by ka...@apache.org.
Merge branch 'STORM-3097-merge'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0eb6b511
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0eb6b511
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0eb6b511
Branch: refs/heads/master
Commit: 0eb6b5116f251e17b6f14a61cebfadfc286faa59
Parents: 4aa6b5e b6cab8d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Jun 9 20:44:03 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jun 9 20:44:03 2018 +0900
----------------------------------------------------------------------
docs/index.md | 1 -
docs/storm-druid.md | 119 ---------------
external/storm-druid/README.md | 147 -------------------
external/storm-druid/pom.xml | 111 --------------
.../apache/storm/druid/bolt/DruidBeamBolt.java | 116 ---------------
.../storm/druid/bolt/DruidBeamFactory.java | 29 ----
.../apache/storm/druid/bolt/DruidConfig.java | 104 -------------
.../druid/bolt/ITupleDruidEventMapper.java | 38 -----
.../storm/druid/bolt/TupleDruidEventMapper.java | 44 ------
.../storm/druid/trident/DruidBeamState.java | 96 ------------
.../druid/trident/DruidBeamStateFactory.java | 42 ------
.../druid/trident/DruidBeamStateUpdater.java | 48 ------
.../storm/druid/SampleDruidBeamFactoryImpl.java | 122 ---------------
.../storm/druid/SampleDruidBoltTopology.java | 94 ------------
.../druid/SampleDruidBoltTridentTopology.java | 90 ------------
.../apache/storm/druid/SimpleBatchSpout.java | 95 ------------
.../org/apache/storm/druid/SimpleSpout.java | 68 ---------
pom.xml | 1 -
18 files changed, 1365 deletions(-)
----------------------------------------------------------------------