You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/06/15 17:15:15 UTC

[1/2] storm git commit: STORM-1893 OpenTSDB bolt and trident state for storing any timeseries data. - Addressed review comments - Fixed issues found while testing.

Repository: storm
Updated Branches:
  refs/heads/master f325febd2 -> 8ea9e5b9d


STORM-1893 OpenTSDB bolt and trident state for storing any timeseries data.
    - Addressed review comments
    - Fixed issues found while testing.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28e2d489
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28e2d489
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28e2d489

Branch: refs/heads/master
Commit: 28e2d4894390956ae995ec2fb7da1cafdc80d2ee
Parents: f325feb
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Jun 8 11:35:42 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Wed Jun 15 12:48:03 2016 +0530

----------------------------------------------------------------------
 external/storm-opentsdb/README.md               |  76 ++++++++
 external/storm-opentsdb/pom.xml                 |  97 ++++++++++
 .../storm/opentsdb/OpenTsdbMetricDatapoint.java | 119 ++++++++++++
 .../bolt/ITupleOpenTsdbDatapointMapper.java     |  39 ++++
 .../storm/opentsdb/bolt/OpenTsdbBolt.java       | 179 +++++++++++++++++++
 .../bolt/TupleOpenTsdbDatapointMapper.java      | 120 +++++++++++++
 .../storm/opentsdb/client/ClientResponse.java   | 178 ++++++++++++++++++
 .../storm/opentsdb/client/OpenTsdbClient.java   | 153 ++++++++++++++++
 .../storm/opentsdb/trident/OpenTsdbState.java   |  89 +++++++++
 .../opentsdb/trident/OpenTsdbStateFactory.java  |  50 ++++++
 .../opentsdb/trident/OpenTsdbStateUpdater.java  |  36 ++++
 .../storm/opentsdb/MetricGenBatchSpout.java     |  94 ++++++++++
 .../apache/storm/opentsdb/MetricGenSpout.java   |  72 ++++++++
 .../opentsdb/SampleOpenTsdbBoltTopology.java    |  70 ++++++++
 .../opentsdb/SampleOpenTsdbTridentTopology.java |  87 +++++++++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |   8 +-
 17 files changed, 1467 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/README.md
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/README.md b/external/storm-opentsdb/README.md
new file mode 100644
index 0000000..39db525
--- /dev/null
+++ b/external/storm-opentsdb/README.md
@@ -0,0 +1,76 @@
+# Storm OpenTSDB Bolt and TridentState
+  
+OpenTSDB offers a scalable and highly available storage for time series data. It consists of a
+Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to the 
+configured HBase cluster to push/query the data.
+
+Time series data point consists of:
+ - a metric name.
+ - a UNIX timestamp (seconds or milliseconds since Epoch).
+ - a value (64 bit integer or single-precision floating point value).
+ - a set of tags (key-value pairs) that describe the time series the point belongs to.
+
+Storm bolt and trident state creates the above time series data from a tuple based on the given `TupleMetricPointMapper`
+  
+This module provides core Storm and Trident bolt implementations for writing data to OpenTSDB. 
+
+Time series data points are written with at-least-once guarantee and duplicate data points should be handled as mentioned [here](http://opentsdb.net/docs/build/html/user_guide/writing.html#duplicate-data-points) in OpenTSDB. 
+
+## Examples
+
+### Core Bolt
+Below example describes the usage of core bolt which is `org.apache.storm.opentsdb.bolt.OpenTsdbBolt`
+
+```java
+
+        OpenTsdbClient.Builder builder =  OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
+        final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+        openTsdbBolt.withBatchSize(10).withFlushInterval(2000);
+        topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");
+        
+```
+
+
+### Trident State
+
+```java
+
+        final OpenTsdbStateFactory openTsdbStateFactory =
+                new OpenTsdbStateFactory(OpenTsdbClient.newBuilder(tsdbUrl),
+                        Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+        TridentTopology tridentTopology = new TridentTopology();
+        
+        final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenSpout());
+        
+        stream.peek(new Consumer() {
+            @Override
+            public void accept(TridentTuple input) {
+                LOG.info("########### Received tuple: [{}]", input);
+            }
+        }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater());
+        
+```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/pom.xml b/external/storm-opentsdb/pom.xml
new file mode 100644
index 0000000..0e01cdf
--- /dev/null
+++ b/external/storm-opentsdb/pom.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-opentsdb</artifactId>
+
+    <developers>
+        <developer>
+            <id>satishd</id>
+            <name>Satish Duggana</name>
+            <email>satish.duggana@gmail.com</email>
+        </developer>
+    </developers>
+
+    <properties>
+        <jersey.version>2.23</jersey.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.media</groupId>
+            <artifactId>jersey-media-json-jackson</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.connectors</groupId>
+            <artifactId>jersey-apache-connector</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+
+        <!--test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/OpenTsdbMetricDatapoint.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/OpenTsdbMetricDatapoint.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/OpenTsdbMetricDatapoint.java
new file mode 100644
index 0000000..b04d817
--- /dev/null
+++ b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/OpenTsdbMetricDatapoint.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * This class represents a metric data point in OpenTSDB's format.
+ */
+public class OpenTsdbMetricDatapoint implements Serializable {
+
+    // metric name
+    private final String metric;
+
+    // map of tag value pairs
+    private final Map<String, String> tags;
+
+    // timestamp either in milliseconds or seconds at which this metric is occurred.
+    private final long timestamp;
+
+    // value of the metric
+    private final Number value;
+
+    // required for jackson serialization
+    private OpenTsdbMetricDatapoint() {
+        this(null, null, 0L, null);
+    }
+
+    public OpenTsdbMetricDatapoint(String metric, Map<String, String> tags, long timestamp, Number value) {
+        this.metric = metric;
+        this.tags = Collections.unmodifiableMap(tags);
+        this.timestamp = timestamp;
+        this.value = value;
+
+        if (!(value instanceof Integer || value instanceof Long || value instanceof Float)) {
+            throw new RuntimeException("Received tuple contains unsupported value: " + value + " field. It must be Integer/Long/Float.");
+        }
+
+    }
+
+    /**
+     * @return metric name of this datapoint
+     */
+    public String getMetric() {
+        return metric;
+    }
+
+    /**
+     * @return Map of tag/value pairs of this metric
+     */
+    public Map<String, String> getTags() {
+        return tags;
+    }
+
+    /**
+     * @return timestamp either in milliseconds or seconds at which this metric is occurred.
+     */
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    /**
+     * @return value of this metric datapoint
+     */
+    public Object getValue() {
+        return value;
+    }
+
+    @Override
+    public String toString() {
+        return "OpenTsdbMetricDataPoint{" +
+                "metric='" + metric + '\'' +
+                ", tags=" + tags +
+                ", timestamp=" + timestamp +
+                ", value=" + value +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof OpenTsdbMetricDatapoint)) return false;
+
+        OpenTsdbMetricDatapoint that = (OpenTsdbMetricDatapoint) o;
+
+        if (timestamp != that.timestamp) return false;
+        if (value != that.value) return false;
+        if (!metric.equals(that.metric)) return false;
+        return tags.equals(that.tags);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = metric.hashCode();
+        result = 31 * result + tags.hashCode();
+        result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+        result = 31 * result + value.hashCode();
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/ITupleOpenTsdbDatapointMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/ITupleOpenTsdbDatapointMapper.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/ITupleOpenTsdbDatapointMapper.java
new file mode 100644
index 0000000..46b2471
--- /dev/null
+++ b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/ITupleOpenTsdbDatapointMapper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb.bolt;
+
+import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint;
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+/**
+ * This class gives a mapping of a {@link ITuple} with {@link OpenTsdbMetricDatapoint}
+ *
+ */
+public interface ITupleOpenTsdbDatapointMapper extends Serializable {
+
+    /**
+     * Returns a {@link OpenTsdbMetricDatapoint} for a given {@code tuple}.
+     *
+     * @param tuple tuple instance
+     */
+    public OpenTsdbMetricDatapoint getMetricPoint(ITuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/OpenTsdbBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/OpenTsdbBolt.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/OpenTsdbBolt.java
new file mode 100644
index 0000000..34be978
--- /dev/null
+++ b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/OpenTsdbBolt.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb.bolt;
+
+import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint;
+import org.apache.storm.opentsdb.client.ClientResponse;
+import org.apache.storm.opentsdb.client.OpenTsdbClient;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.BatchHelper;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
+ * <p/>
+ * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
+ * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
+ * <p/>
+ *
+ * Example topology:
+ * <blockquote><pre>
+ * OpenTsdbClient.Builder builder =  OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
+ * final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+ * openTsdbBolt.withBatchSize(10).withFlushInterval(2).failTupleForFailedMetrics();
+ * topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");
+ * </pre></blockquote>
+ *
+ */
+public class OpenTsdbBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
+
+    private final OpenTsdbClient.Builder openTsdbClientBuilder;
+    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
+    private int batchSize;
+    private int flushIntervalInSeconds;
+    private boolean failTupleForFailedMetrics;
+
+    private BatchHelper batchHelper;
+    private OpenTsdbClient openTsdbClient;
+    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
+    private OutputCollector collector;
+
+    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
+        this.openTsdbClientBuilder = openTsdbClientBuilder;
+        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
+    }
+
+    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
+        this.flushIntervalInSeconds = flushIntervalInSeconds;
+        return this;
+    }
+
+    public OpenTsdbBolt withBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    /**
+     * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails
+     * the respective tuples of the failed metrics.
+     *
+     * @return same instance by setting {@code failTupleForFailedMetrics} to true
+     */
+    public OpenTsdbBolt failTupleForFailedMetrics() {
+        this.failTupleForFailedMetrics = true;
+        return this;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        batchHelper = new BatchHelper(batchSize, collector);
+        openTsdbClient = openTsdbClientBuilder.build();
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            if (batchHelper.shouldHandle(tuple)) {
+                final List<OpenTsdbMetricDatapoint> metricDataPoints = getMetricPoints(tuple);
+                for (OpenTsdbMetricDatapoint metricDataPoint : metricDataPoints) {
+                    metricPointsWithTuple.put(metricDataPoint, tuple);
+                }
+                batchHelper.addBatch(tuple);
+            }
+
+            if (batchHelper.shouldFlush()) {
+                LOG.debug("Sending metrics of size [{}]", metricPointsWithTuple.size());
+
+                ClientResponse.Details clientResponse = openTsdbClient.writeMetricPoints(metricPointsWithTuple.keySet());
+
+                if (failTupleForFailedMetrics && clientResponse != null && clientResponse.getFailed() > 0) {
+                    final List<ClientResponse.Details.Error> errors = clientResponse.getErrors();
+                    LOG.error("Some of the metric points failed with errors: [{}]", clientResponse);
+                    if (errors != null && !errors.isEmpty()) {
+
+                        Set<Tuple> failedTuples = new HashSet<>();
+                        for (ClientResponse.Details.Error error : errors) {
+                            final Tuple failedTuple = metricPointsWithTuple.get(error.getDatapoint());
+                            if (failedTuple != null) {
+                                failedTuples.add(failedTuple);
+                            }
+                        }
+
+                        for (Tuple batchedTuple : batchHelper.getBatchTuples()) {
+                            if (failedTuples.contains(batchedTuple)) {
+                                collector.fail(batchedTuple);
+                            } else {
+                                collector.ack(batchedTuple);
+                            }
+                        }
+
+                    } else {
+                        throw new RuntimeException("Some of the metric points failed with details: " + errors);
+                    }
+                } else {
+                    LOG.debug("Acknowledging batched tuples");
+                    batchHelper.ack();
+                }
+                metricPointsWithTuple.clear();
+            }
+        } catch (Exception e) {
+            batchHelper.fail(e);
+            metricPointsWithTuple.clear();
+        }
+    }
+
+    private List<OpenTsdbMetricDatapoint> getMetricPoints(Tuple tuple) {
+        List<OpenTsdbMetricDatapoint> metricDataPoints = new ArrayList<>();
+        for (TupleOpenTsdbDatapointMapper tupleOpenTsdbDatapointMapper : tupleOpenTsdbDatapointMappers) {
+            metricDataPoints.add(tupleOpenTsdbDatapointMapper.getMetricPoint(tuple));
+        }
+
+        return metricDataPoints;
+    }
+
+    @Override
+    public void cleanup() {
+        openTsdbClient.cleanup();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        // this is a sink and no result to emit.
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalInSeconds);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java
new file mode 100644
index 0000000..8953535
--- /dev/null
+++ b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb.bolt;
+
+import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint;
+import org.apache.storm.tuple.ITuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Converts {@link org.apache.storm.tuple.ITuple} to {@link OpenTsdbMetricDatapoint}
+ */
+public final class TupleOpenTsdbDatapointMapper implements ITupleOpenTsdbDatapointMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(TupleOpenTsdbDatapointMapper.class);
+
+    /**
+     * Default mapper which can be used when the tuple already contains fields mapping  metric, timestamp, tags and value.
+     */
+    public static final TupleOpenTsdbDatapointMapper DEFAULT_MAPPER =
+            new TupleOpenTsdbDatapointMapper("metric", "timestamp", "tags", "value");
+
+    private final String metricField;
+    private final String timestampField;
+    private final String valueField;
+    private final String tagsField;
+
+    public TupleOpenTsdbDatapointMapper(String metricField, String timestampField, String tagsField, String valueField) {
+        this.metricField = metricField;
+        this.timestampField = timestampField;
+        this.tagsField = tagsField;
+        this.valueField = valueField;
+    }
+
+    @Override
+    public OpenTsdbMetricDatapoint getMetricPoint(ITuple tuple) {
+            return new OpenTsdbMetricDatapoint(
+                    tuple.getStringByField(metricField),
+                    (Map<String, String>) tuple.getValueByField(tagsField),
+                    tuple.getLongByField(timestampField),
+                    (Number) tuple.getValueByField(valueField));
+    }
+
+    /**
+     * @return metric field name in the tuple.
+     */
+    public String getMetricField() {
+        return metricField;
+    }
+
+    /**
+     * @return timestamp field name in the tuple.
+     */
+    public String getTimestampField() {
+        return timestampField;
+    }
+
+    /**
+     * @return value field name in the tuple.
+     */
+    public String getValueField() {
+        return valueField;
+    }
+
+    /**
+     * @return tags field name in the tuple
+     */
+    public String getTagsField() {
+        return tagsField;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof TupleOpenTsdbDatapointMapper)) return false;
+
+        TupleOpenTsdbDatapointMapper that = (TupleOpenTsdbDatapointMapper) o;
+
+        if (!metricField.equals(that.metricField)) return false;
+        if (!timestampField.equals(that.timestampField)) return false;
+        if (!valueField.equals(that.valueField)) return false;
+        return tagsField.equals(that.tagsField);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = metricField.hashCode();
+        result = 31 * result + timestampField.hashCode();
+        result = 31 * result + valueField.hashCode();
+        result = 31 * result + tagsField.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "TupleOpenTsdbDatapointMapper{" +
+                "metricField='" + metricField + '\'' +
+                ", timestampField='" + timestampField + '\'' +
+                ", valueField='" + valueField + '\'' +
+                ", tagsField='" + tagsField + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/ClientResponse.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/ClientResponse.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/ClientResponse.java
new file mode 100644
index 0000000..7c3e191
--- /dev/null
+++ b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/ClientResponse.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb.client;
+
+import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * This class represents the response from OpenTsdb for a request sent.
+ */
+public interface ClientResponse extends Serializable {
+
+
+    public class Summary implements ClientResponse {
+        private int failed;
+        private int success;
+        private int timeouts;
+
+        public Summary() {
+        }
+
+        public Summary(int success, int failed, int timeouts) {
+            this.failed = failed;
+            this.success = success;
+            this.timeouts = timeouts;
+        }
+
+        public int getFailed() {
+            return failed;
+        }
+
+        public int getSuccess() {
+            return success;
+        }
+
+        public int getTimeouts() {
+            return timeouts;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Summary)) return false;
+
+            Summary summary = (Summary) o;
+
+            if (failed != summary.failed) return false;
+            if (success != summary.success) return false;
+            return timeouts == summary.timeouts;
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = failed;
+            result = 31 * result + success;
+            result = 31 * result + timeouts;
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "Summary{" +
+                    "failed=" + failed +
+                    ", success=" + success +
+                    ", timeouts=" + timeouts +
+                    '}';
+        }
+    }
+
+    public class Details extends Summary {
+        private List<Error> errors;
+
+        public Details() {
+        }
+
+        public Details(int success, int failed, int timeouts, List<Error> errors) {
+            super(success, failed, timeouts);
+            this.errors = errors;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Details)) return false;
+            if (!super.equals(o)) return false;
+
+            Details details = (Details) o;
+
+            return errors.equals(details.errors);
+
+        }
+
+        public List<Error> getErrors() {
+            return errors;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = super.hashCode();
+            result = 31 * result + errors.hashCode();
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "Details{" +
+                    "errors=" + errors +
+                    super.toString()+'}';
+        }
+
+        public static class Error implements Serializable {
+            private String error;
+            private OpenTsdbMetricDatapoint datapoint;
+
+            public Error() {
+            }
+
+            public Error(String error, OpenTsdbMetricDatapoint datapoint) {
+                this.error = error;
+                this.datapoint = datapoint;
+            }
+
+            public String getError() {
+                return error;
+            }
+
+            public OpenTsdbMetricDatapoint getDatapoint() {
+                return datapoint;
+            }
+
+            @Override
+            public boolean equals(Object o) {
+                if (this == o) return true;
+                if (!(o instanceof Error)) return false;
+
+                Error error1 = (Error) o;
+
+                if (!error.equals(error1.error)) return false;
+                return datapoint.equals(error1.datapoint);
+
+            }
+
+            @Override
+            public int hashCode() {
+                int result = error.hashCode();
+                result = 31 * result + datapoint.hashCode();
+                return result;
+            }
+
+            @Override
+            public String toString() {
+                return "Error{" +
+                        "error='" + error + '\'' +
+                        ", datapoint=" + datapoint +
+                        '}';
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/OpenTsdbClient.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/OpenTsdbClient.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/OpenTsdbClient.java
new file mode 100644
index 0000000..0ce807f
--- /dev/null
+++ b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/OpenTsdbClient.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint;
+import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.RequestEntityProcessing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * Client to connect to OpenTsdb TSD for storing timeseries datapoints.
+ */
+public class OpenTsdbClient {
+    private static final String PUT_PATH = "/api/put";
+    private static Logger LOG = LoggerFactory.getLogger(OpenTsdbClient.class);
+
+    private final String urlString;
+    private final boolean sync;
+    private final long syncTimeout;
+    private final ResponseType responseType;
+    private final boolean enableChunkedEncoding;
+
+    private WebTarget target;
+    private Client client;
+
+    public enum ResponseType {
+        None(""),
+        Summary("summary"),
+        Details("details");
+
+        private final String value;
+
+        ResponseType(String value) {
+            this.value = value;
+        }
+    }
+
+    protected OpenTsdbClient(String urlString, boolean sync, long syncTimeOut, ResponseType responseType, boolean enableChunkedEncoding) {
+        this.urlString = urlString;
+        this.sync = sync;
+        this.syncTimeout = syncTimeOut;
+        this.responseType = responseType;
+        this.enableChunkedEncoding = enableChunkedEncoding;
+
+        init();
+    }
+
+    private void init() {
+
+        final ApacheConnectorProvider apacheConnectorProvider = new ApacheConnectorProvider();
+        final ClientConfig clientConfig = new ClientConfig().connectorProvider(apacheConnectorProvider);
+
+        // transfer encoding should be set as jersey sets it on by default.
+        clientConfig.property(ClientProperties.REQUEST_ENTITY_PROCESSING,
+                enableChunkedEncoding ? RequestEntityProcessing.CHUNKED : RequestEntityProcessing.BUFFERED);
+
+        client = ClientBuilder.newClient(clientConfig);
+
+        target = client.target(urlString).path(PUT_PATH);
+        if(sync) {
+            // need to add an empty string else it is nto added as query param.
+            target = target.queryParam("sync", "").queryParam("sync_timeout", syncTimeout);
+        }
+        if(responseType != ResponseType.None) {
+            // need to add an empty string else it is nto added as query param.
+            target = target.queryParam(responseType.value, "");
+        }
+
+        LOG.info("target uri [{}]", target.getUri());
+    }
+
+    public ClientResponse.Details writeMetricPoint(OpenTsdbMetricDatapoint metricDataPoint) {
+        return target.request().post(Entity.json(metricDataPoint), ClientResponse.Details.class);
+    }
+
+    public ClientResponse.Details writeMetricPoints(Collection<OpenTsdbMetricDatapoint> metricDataPoints) {
+        LOG.debug("Writing metric points to OpenTSDB [{}]", metricDataPoints.size());
+        return target.request().post(Entity.json(metricDataPoints), ClientResponse.Details.class);
+    }
+
+    public void cleanup() {
+        client.close();
+    }
+
+    public static OpenTsdbClient.Builder newBuilder(String url) {
+        return new Builder(url);
+    }
+
+    public static class Builder implements Serializable {
+        private final String url;
+        private boolean sync;
+        private long syncTimeOut;
+        private boolean enableChunkedEncoding;
+        private ResponseType responseType = ResponseType.None;
+
+        protected Builder(String url) {
+            this.url = url;
+        }
+
+        public OpenTsdbClient.Builder sync(long timeoutInMilliSecs) {
+            Preconditions.checkArgument(timeoutInMilliSecs > 0, "timeout value should be more than zero.");
+            sync = true;
+            syncTimeOut = timeoutInMilliSecs;
+            return this;
+        }
+
+        public OpenTsdbClient.Builder returnSummary() {
+            responseType = ResponseType.Summary;
+            return this;
+        }
+
+        public OpenTsdbClient.Builder returnDetails() {
+            responseType = ResponseType.Details;
+            return this;
+        }
+
+        public Builder enableChunkedEncoding() {
+            enableChunkedEncoding = true;
+            return this;
+        }
+
+        public OpenTsdbClient build() {
+            return new OpenTsdbClient(url, sync, syncTimeOut, responseType, enableChunkedEncoding);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbState.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbState.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbState.java
new file mode 100644
index 0000000..02698d4
--- /dev/null
+++ b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbState.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb.trident;
+
+import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint;
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.opentsdb.client.ClientResponse;
+import org.apache.storm.opentsdb.client.OpenTsdbClient;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Trident {@link State} implementation for OpenTSDB.
+ */
+public class OpenTsdbState implements State {
+    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbState.class);
+
+    private final Map conf;
+    private final OpenTsdbClient.Builder openTsdbClientBuilder;
+    private final Iterable<TupleOpenTsdbDatapointMapper> tupleMetricPointMappers;
+    private OpenTsdbClient openTsdbClient;
+
+    public OpenTsdbState(Map conf, OpenTsdbClient.Builder openTsdbClientBuilder, Iterable<TupleOpenTsdbDatapointMapper> tupleMetricPointMappers) {
+        this.conf = conf;
+        this.openTsdbClientBuilder = openTsdbClientBuilder;
+        this.tupleMetricPointMappers = tupleMetricPointMappers;
+    }
+
+    public void prepare() {
+        openTsdbClient = openTsdbClientBuilder.build();
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+
+    }
+
+    @Override
+    public void commit(Long txid) {
+
+    }
+
+    public void update(List<TridentTuple> tridentTuples, TridentCollector collector) {
+        try {
+            List<OpenTsdbMetricDatapoint> metricDataPoints = new ArrayList<>();
+            for (TridentTuple tridentTuple : tridentTuples) {
+                for (TupleOpenTsdbDatapointMapper tupleOpenTsdbDatapointMapper : tupleMetricPointMappers) {
+                    metricDataPoints.add(tupleOpenTsdbDatapointMapper.getMetricPoint(tridentTuple));
+                }
+            }
+            final ClientResponse.Details details = openTsdbClient.writeMetricPoints(metricDataPoints);
+
+            if(details != null && (details.getFailed() > 0) ) {
+                final String errorMsg = "Failed in writing metrics to TSDB with details: " + details;
+                LOG.error(errorMsg);
+                throw new RuntimeException(errorMsg);
+            }
+
+        } catch (Exception e) {
+            collector.reportError(e);
+            throw new FailedException(e);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateFactory.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateFactory.java
new file mode 100644
index 0000000..20890c9
--- /dev/null
+++ b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb.trident;
+
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.opentsdb.client.OpenTsdbClient;
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Trident {@link StateFactory} implementation for OpenTSDB.
+ */
+public class OpenTsdbStateFactory implements StateFactory {
+
+    private OpenTsdbClient.Builder builder;
+    private final List<TupleOpenTsdbDatapointMapper> tridentTupleOpenTsdbDatapointMappers;
+
+    public OpenTsdbStateFactory(OpenTsdbClient.Builder builder, List<TupleOpenTsdbDatapointMapper> tridentTupleOpenTsdbDatapointMappers) {
+        this.builder = builder;
+        this.tridentTupleOpenTsdbDatapointMappers = tridentTupleOpenTsdbDatapointMappers;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        final OpenTsdbState openTsdbState = new OpenTsdbState(conf, builder, tridentTupleOpenTsdbDatapointMappers);
+        openTsdbState.prepare();
+
+        return openTsdbState;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateUpdater.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateUpdater.java
new file mode 100644
index 0000000..935611f
--- /dev/null
+++ b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateUpdater.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb.trident;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
+ * Trident {@link org.apache.storm.trident.state.StateUpdater} implementation for OpenTSDB.
+ */
+public class OpenTsdbStateUpdater extends BaseStateUpdater<OpenTsdbState> {
+
+    @Override
+    public void updateState(OpenTsdbState state, List<TridentTuple> tuples, TridentCollector collector) {
+        state.update(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java b/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java
new file mode 100644
index 0000000..b0580f6
--- /dev/null
+++ b/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * BatchSpout implementation for metrics generation.
+ */
+public class MetricGenBatchSpout implements IBatchSpout {
+
+    private int batchSize;
+    private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+    public MetricGenBatchSpout(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context) {
+
+    }
+
+    @Override
+    public void emitBatch(long batchId, TridentCollector collector) {
+        List<List<Object>> values;
+        if(batches.containsKey(batchId)) {
+            values = batches.get(batchId);
+        } else {
+            values = new ArrayList<>();
+            for (int i = 0; i < batchSize; i++) {
+                // tuple values are mapped with
+                // metric, timestamp, value, Map of tagK/tagV respectively.
+                values.add(Lists.newArrayList(Lists.newArrayList("device.temp", System.currentTimeMillis(), new Random().nextLong(),
+                        Collections.singletonMap("loc.id", new Random().nextInt() % 64 + ""))));
+            }
+            batches.put(batchId, values);
+        }
+        for (List<Object> value : values) {
+            collector.emit(value);
+        }
+
+    }
+
+    @Override
+    public void ack(long batchId) {
+        batches.remove(batchId);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(1);
+        return conf;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return MetricGenSpout.DEFAULT_METRIC_FIELDS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/MetricGenSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/MetricGenSpout.java b/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/MetricGenSpout.java
new file mode 100644
index 0000000..21af196
--- /dev/null
+++ b/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/MetricGenSpout.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Spout to generate tuples containing metric data.
+ */
+public class MetricGenSpout extends BaseRichSpout {
+
+    public static final Fields DEFAULT_METRIC_FIELDS =
+            new Fields(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getMetricField(),
+                    TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getTimestampField(),
+                    TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getValueField(),
+                    TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getTagsField());
+
+    private Map conf;
+    private TopologyContext context;
+    private SpoutOutputCollector collector;
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(DEFAULT_METRIC_FIELDS);
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.conf = conf;
+        this.context = context;
+        this.collector = collector;
+    }
+
+    @Override
+    public void nextTuple() {
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        // tuple values are mapped with
+        // metric, timestamp, value, Map of tagK/tagV respectively.
+        collector.emit(Lists.newArrayList("device.temp", System.currentTimeMillis(), new Random().nextLong(),
+                Collections.singletonMap("loc.id", new Random().nextInt() % 64 + "")));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java b/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
new file mode 100644
index 0000000..6c511b8
--- /dev/null
+++ b/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import org.apache.storm.opentsdb.bolt.OpenTsdbBolt;
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.opentsdb.client.OpenTsdbClient;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+
+import java.util.Collections;
+
+/**
+ * Sample application to use OpenTSDB bolt.
+ */
+public class SampleOpenTsdbBoltTopology {
+
+    public static void main(String[] args) throws Exception {
+        if(args.length == 0) {
+            throw new IllegalArgumentException("There should be at least one argument. Run as `SampleOpenTsdbBoltTopology <tsdb-url>`");
+        }
+
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+        topologyBuilder.setSpout("metric-gen", new MetricGenSpout(), 5);
+
+        String openTsdbUrl = args[0];
+        OpenTsdbClient.Builder builder =  OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
+        final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+        openTsdbBolt.withBatchSize(10).withFlushInterval(2).failTupleForFailedMetrics();
+        topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");
+
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        if (args.length > 1) {
+            conf.setNumWorkers(3);
+
+            StormSubmitter.submitTopologyWithProgressBar(args[1], conf, topologyBuilder.createTopology());
+        } else {
+            conf.setMaxTaskParallelism(3);
+
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("word-count", conf, topologyBuilder.createTopology());
+
+            Thread.sleep(30000);
+
+            cluster.shutdown();
+            System.exit(0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java b/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
new file mode 100644
index 0000000..db51a8a
--- /dev/null
+++ b/external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.opentsdb.client.OpenTsdbClient;
+import org.apache.storm.opentsdb.trident.OpenTsdbStateFactory;
+import org.apache.storm.opentsdb.trident.OpenTsdbStateUpdater;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.Consumer;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * Sample trident topology to store time series metrics in to OpenTsdb.
+ */
+public class SampleOpenTsdbTridentTopology {
+    private static final Logger LOG = LoggerFactory.getLogger(SampleOpenTsdbTridentTopology.class);
+
+    public static void main(String[] args) throws Exception {
+        if(args.length == 0) {
+            throw new IllegalArgumentException("There should be at least one argument. Run as `SampleOpenTsdbTridentTopology <tsdb-url>`");
+        }
+
+        String tsdbUrl = args[0];
+
+
+        final OpenTsdbClient.Builder openTsdbClientBuilder = OpenTsdbClient.newBuilder(tsdbUrl);
+        final OpenTsdbStateFactory openTsdbStateFactory =
+                new OpenTsdbStateFactory(openTsdbClientBuilder,
+                        Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+
+        TridentTopology tridentTopology = new TridentTopology();
+        final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenBatchSpout(10));
+
+        stream.peek(new Consumer() {
+            @Override
+            public void accept(TridentTuple input) {
+                LOG.info("########### Received tuple: [{}]", input);
+            }
+        }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater());
+
+
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        if (args.length > 1) {
+            conf.setNumWorkers(3);
+
+            StormSubmitter.submitTopologyWithProgressBar(args[1], conf, tridentTopology.build());
+        } else {
+            conf.setMaxTaskParallelism(3);
+
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("word-count", conf, tridentTopology.build());
+
+            Thread.sleep(30000);
+
+            cluster.shutdown();
+            System.exit(0);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 21418e0..e204218 100644
--- a/pom.xml
+++ b/pom.xml
@@ -294,6 +294,7 @@
         <module>examples/storm-starter</module>
         <module>storm-clojure</module>
         <module>external/storm-kafka-client</module>
+        <module>external/storm-opentsdb</module>
     </modules>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/28e2d489/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 648640e..cbe0103 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -331,7 +331,13 @@
                 <include>README.*</include>
             </includes>
         </fileSet>
-
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-opentsdb</directory>
+            <outputDirectory>external/storm-opentsdb</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
         <!-- $STORM_HOME/extlib -->
         <fileSet>
             <directory></directory>


[2/2] storm git commit: Added STORM-1893 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1893 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8ea9e5b9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8ea9e5b9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8ea9e5b9

Branch: refs/heads/master
Commit: 8ea9e5b9dfc8dbe2d2042e58bb499ba029d9776b
Parents: 28e2d48
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Jun 15 09:55:34 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Jun 15 09:55:34 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8ea9e5b9/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 869cdfb..974a565 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -105,6 +105,7 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0
+ * STORM-1893: Support OpenTSDB for storing timeseries data.
  * STORM-1723: Introduce ClusterMetricsConsumer
  * STORM-1700: Introduce 'whitelist' / 'blacklist' option to MetricsConsumer
  * STORM-1698: Asynchronous MetricsConsumerBolt