You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/03 01:10:45 UTC

[1/3] storm git commit: STORM-2209: Update documents adding new integration for some external systems

Repository: storm
Updated Branches:
  refs/heads/1.x-branch def89a229 -> f43dabfa8


STORM-2209: Update documents adding new integration for some external systems


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e08b1c5b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e08b1c5b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e08b1c5b

Branch: refs/heads/1.x-branch
Commit: e08b1c5b06a612d9435410ad1919adaf8fd085a6
Parents: 3e720c1
Author: Xin Wang <be...@163.com>
Authored: Sun Nov 20 13:49:29 2016 +0800
Committer: Xin Wang <be...@163.com>
Committed: Mon Nov 28 21:21:02 2016 +0800

----------------------------------------------------------------------
 docs/index.md              |  11 ++-
 docs/storm-cassandra.md    |  23 -----
 docs/storm-druid.md        | 119 +++++++++++++++++++++++++
 docs/storm-kafka-client.md | 188 ++++++++++++++++++++++++++++++++++++++++
 docs/storm-kinesis.md      | 136 +++++++++++++++++++++++++++++
 docs/storm-mongodb.md      |  23 -----
 docs/storm-opentsdb.md     |  52 +++++++++++
 docs/storm-redis.md        |  24 -----
 8 files changed, 505 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e08b1c5b/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 00b08a1..b0db36b 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -79,7 +79,7 @@ Trident is an alternative interface to Storm. It provides exactly-once processin
 * [Event Logging](Eventlogging.html)
 
 ### Integration With External Systems, and Other Libraries
-* [Apache Kafka Integration](storm-kafka.html)
+* [Apache Kafka Integration](storm-kafka.html), [New Kafka Consumer Integration](storm-kafka-client.html)
 * [Apache HBase Integration](storm-hbase.html)
 * [Apache HDFS Integration](storm-hdfs.html)
 * [Apache Hive Integration](storm-hive.html)
@@ -92,8 +92,17 @@ Trident is an alternative interface to Storm. It provides exactly-once processin
 * [Elasticsearch Integration](storm-elasticsearch.html)
 * [MQTT Integration](storm-mqtt.html)
 * [Mongodb Integration](storm-mongodb.html)
+* [OpenTSDB Integration](storm-opentsdb.html)
+* [Kinesis Integration](storm-kinesis.html)
+* [Druid Integration](storm-druid.html)
 * [Kestrel Integration](Kestrel-and-Storm.html)
 
+#### Container, Resource Management System Integration
+
+* [YARN Integration](https://github.com/yahoo/storm-yarn), [YARN Integration via Slider](http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_yarn_resource_mgt/content/ref-7d103a48-7c2e-4b7b-aab5-62c739a32ee0.1.html)
+* [Mesos Integration](https://github.com/mesos/storm)
+* [Docker Integration](https://hub.docker.com/_/storm/)
+
 ### Advanced
 
 * [Defining a non-JVM language DSL for Storm](Defining-a-non-jvm-language-dsl-for-storm.html)

http://git-wip-us.apache.org/repos/asf/storm/blob/e08b1c5b/docs/storm-cassandra.md
----------------------------------------------------------------------
diff --git a/docs/storm-cassandra.md b/docs/storm-cassandra.md
index c674fbc..47fabbd 100644
--- a/docs/storm-cassandra.md
+++ b/docs/storm-cassandra.md
@@ -230,26 +230,3 @@ Below `state` API for `querying` data from Cassandra.
         TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
         stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));         
 ```
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
- * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/e08b1c5b/docs/storm-druid.md
----------------------------------------------------------------------
diff --git a/docs/storm-druid.md b/docs/storm-druid.md
new file mode 100644
index 0000000..bed50dc
--- /dev/null
+++ b/docs/storm-druid.md
@@ -0,0 +1,119 @@
+# Storm Druid Bolt and TridentState
+
+This module provides core Storm and Trident bolt implementations for writing data to [Druid](http://druid.io/) data store.
+This implementation uses Druid's [Tranquility library](https://github.com/druid-io/tranquility) to send messages to druid.
+
+Some of the implementation details are borrowed from existing [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md).
+This new Bolt added to support latest storm release and maintain the bolt in the storm repo.
+
+### Core Bolt
+Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt`
+By default this Bolt expects to receive tuples in which "event" field gives your event type.
+This logic can be changed by implementing ITupleDruidEventMapper interface.
+
+```java
+
+   DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
+   DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
+   ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+   DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
+   topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
+   topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
+
+```
+
+
+### Trident State
+
+```java
+    DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
+    ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+
+    final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
+
+    stream.peek(new Consumer() {
+        @Override
+        public void accept(TridentTuple input) {
+             LOG.info("########### Received tuple: [{}]", input);
+         }
+    }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
+
+```
+
+### Sample Beam Factory Implementation
+Druid bolt must be supplied with a BeamFactory. You can implement one of these using the [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method.
+See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
+For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs.
+
+```java
+
+public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
+
+    @Override
+    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
+
+
+        final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
+        final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
+        final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
+        final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
+        List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
+                new CountAggregatorFactory(
+                        "click"
+                )
+        );
+        // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
+        final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
+        {
+            @Override
+            public DateTime timestamp(Map<String, Object> theMap)
+            {
+                return new DateTime(theMap.get("timestamp"));
+            }
+        };
+
+        // Tranquility uses ZooKeeper (through Curator) for coordination.
+        final CuratorFramework curator = CuratorFrameworkFactory
+                .builder()
+                .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
+                .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
+                .build();
+        curator.start();
+
+        // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
+        // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
+        final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
+
+        // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
+        // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
+        // In this case, we won't provide one, so we're just using Jackson.
+        final Beam<Map<String, Object>> beam = DruidBeams
+                .builder(timestamper)
+                .curator(curator)
+                .discoveryPath(discoveryPath)
+                .location(DruidLocation.create(indexService, dataSource))
+                .timestampSpec(timestampSpec)
+                .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
+                .tuning(
+                        ClusteredBeamTuning
+                                .builder()
+                                .segmentGranularity(Granularity.HOUR)
+                                .windowPeriod(new Period("PT10M"))
+                                .partitions(1)
+                                .replicants(1)
+                                .build()
+                )
+                .druidBeamConfig(
+                      DruidBeamConfig
+                           .builder()
+                           .indexRetryPeriod(new Period("PT10M"))
+                           .build())
+                .buildBeam();
+
+        return beam;
+    }
+}
+
+```
+
+Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)

http://git-wip-us.apache.org/repos/asf/storm/blob/e08b1c5b/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
new file mode 100644
index 0000000..c8e038f
--- /dev/null
+++ b/docs/storm-kafka-client.md
@@ -0,0 +1,188 @@
+#Storm Kafka Spout with New Kafka Consumer API
+
+Apache Storm Spout implementation to consume data from Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version Compatibility] (#compatibility)). 
+
+The Apache Storm Spout allows clients to consume data from Kafka starting at offsets as defined by the offset strategy specified in `FirstPollOffsetStrategy`. 
+In case of failure, the Kafka Spout will re-start consuming messages from the offset that matches the chosen `FirstPollOffsetStrategy`.
+
+The Kafka Spout implementation allows you to specify the stream (`KafkaSpoutStream`) associated with each topic or topic wildcard. `KafkaSpoutStream` represents the stream and output fields. For named topics use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use `KafkaSpoutStreamsWildcardTopics`. 
+
+The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s from `ConsumerRecord`s. The logic is provided by the user through implementing the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use `KafkaSpoutTuplesBuilderWildcardTopics`.
+
+Multiple topics and topic wildcards can use the same `KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s from `ConsumerRecord`s is identical.
+
+
+# Usage Examples
+
+### Create a Kafka Spout
+
+The code snippet bellow is extracted from the example in the module [test] (https://github.com/apache/storm/tree/master/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test). The code that is common for named topics and topic wildcards is in the first box. The specific implementations are in the appropriate section. 
+
+These snippets serve as a reference and do not compile. If you would like to reuse this code in your implementation, please obtain it from the test module, where it is complete.
+
+```java
+KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
+
+KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
+        .setOffsetCommitPeriodMs(10_000)
+        .setFirstPollOffsetStrategy(EARLIEST)
+        .setMaxUncommittedOffsets(250)
+        .build();
+
+Map<String, Object> kafkaConsumerProps= new HashMap<>();
+kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,"127.0.0.1:9092");
+kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.GROUP_ID,"kafkaSpoutTestGroup");
+kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
+kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
+
+KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
+        KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
+```
+
+### Named Topics
+```java
+KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})
+            .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // contents of topic test2 sent to test_stream
+            .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // contents of topic test2 sent to test2_stream
+            .build();
+            
+KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
+            new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
+            new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
+            .build();
+            
+String[] STREAMS = new String[]{"test_stream", "test1_stream", "test2_stream"};
+String[] TOPICS = new String[]{"test", "test1", "test2"};
+
+Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
+Fields outputFields1 = new Fields("topic", "partition", "offset");
+```
+
+### Topic Wildcards
+```java
+KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsWildcardTopics(
+            new KafkaSpoutStream(outputFields, STREAM, Pattern.compile(TOPIC_WILDCARD_PATTERN)));
+
+KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN);
+
+String STREAM = "test_wildcard_stream";
+String TOPIC_WILDCARD_PATTERN = "test[1|2]";
+
+Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
+```
+
+### Create a simple Toplogy using the Kafka Spout:
+
+
+```java
+TopologyBuilder tp = new TopologyBuilder();
+tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
+tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
+tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+tp.createTopology();
+```
+
+# Build And Run Bundled Examples  
+To be able to run the examples you must first build the java code in the package `storm-kafka-client`, 
+and then generate an uber jar with all the dependencies.
+
+## Use the Maven Shade Plugin to Build the Uber Jar
+
+Add the following to `REPO_HOME/storm/external/storm-kafka-client/pom.xml`
+```xml
+<plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+    <artifactId>maven-shade-plugin</artifactId>
+    <version>2.4.1</version>
+    <executions>
+        <execution>
+            <phase>package</phase>
+            <goals>
+                <goal>shade</goal>
+            </goals>
+            <configuration>
+                <transformers>
+                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                        <mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass>
+                    </transformer>
+                </transformers>
+            </configuration>
+        </execution>
+    </executions>
+</plugin>
+```
+
+create the uber jar by running the commmand:
+
+`mvn package -f REPO_HOME/storm/external/storm-kafka-client/pom.xml`
+
+This will create the uber jar file with the name and location matching the following pattern:
+ 
+`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar`
+
+### Run Storm Topology
+
+Copy the file `REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar` to `STORM_HOME/extlib`
+
+Using the Kafka command line tools create three topics [test, test1, test2] and use the Kafka console producer to populate the topics with some data 
+
+Execute the command `STORM_HOME/bin/storm jar REPO_HOME/storm/external/storm/target/storm-kafka-client-1.0.x.jar org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain`
+
+With the debug level logs enabled it is possible to see the messages of each topic being redirected to the appropriate Bolt as defined 
+by the streams defined and choice of shuffle grouping.   
+
+## Using storm-kafka-client with different versions of kafka
+
+Storm-kafka-client's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in
+as a transitive dependency. This allows you to use a version of Kafka dependency compatible with your kafka cluster.
+
+When building a project with storm-kafka-client, you must explicitly add the Kafka clients dependency. For example, to
+use Kafka-clients 0.10.0.0, you would use the following dependency in your `pom.xml`:
+
+```xml
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.0.0</version>
+        </dependency>
+```
+
+You can also override the kafka clients version while building from maven, with parameter `storm.kafka.client.version`
+e.g. `mvn clean install -Dstorm.kafka.client.version=0.10.0.0`
+
+When selecting a kafka client version, you should ensure - 
+ 1. kafka api is compatible. storm-kafka-client module only supports **0.10 or newer** kafka client API. For older versions,
+ you can use storm-kafka module (https://github.com/apache/storm/tree/master/external/storm-kafka).  
+ 2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with 
+ 0.8.x broker. 
+
+#Kafka Spout Performance Tuning
+
+The Kafka spout provides two internal parameters to control its performance. The parameters can be set using the [KafkaSpoutConfig] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) methods [setOffsetCommitPeriodMs] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193) and [setMaxUncommittedOffsets] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217). 
+
+* "offset.commit.period.ms" controls how often the spout commits to Kafka
+* "max.uncommitted.offsets" controls how many offsets can be pending commit before another poll can take place
+<br/>
+
+The [Kafka consumer config] (http://kafka.apache.org/documentation.html#consumerconfigs) parameters may also have an impact on the performance of the spout. The following Kafka parameters are likely the most influential in the spout performance: 
+
+* \u201cfetch.min.bytes\u201d
+* \u201cfetch.max.wait.ms\u201d
+* [Kafka Consumer] (http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) instance poll timeout, which is specified for each Kafka spout using the [KafkaSpoutConfig] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) method [setPollTimeoutMs] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
+<br/>
+
+Depending on the structure of your Kafka cluster, distribution of the data, and availability of data to poll, these parameters will have to be configured appropriately. Please refer to the Kafka documentation on Kafka parameter tuning.
+
+###Default values
+
+Currently the Kafka spout has has the following default values, which have shown to give good performance in the test environment as described in this [blog post] (https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
+
+* poll.timeout.ms = 200
+* offset.commit.period.ms = 30000   (30s)
+* max.uncommitted.offsets = 10000000
+<br/>
+
+There will be a blog post coming soon analyzing the trade-offs of this tuning parameters, and comparing the performance of the Kafka Spouts using the Kafka client API introduced in 0.9 (new implementation) and in prior versions (prior implementation)
+
+#Future Work
+ Implement comprehensive metrics. Trident spout is coming soon.

http://git-wip-us.apache.org/repos/asf/storm/blob/e08b1c5b/docs/storm-kinesis.md
----------------------------------------------------------------------
diff --git a/docs/storm-kinesis.md b/docs/storm-kinesis.md
new file mode 100644
index 0000000..b23c10d
--- /dev/null
+++ b/docs/storm-kinesis.md
@@ -0,0 +1,136 @@
+#Storm Kinesis Spout
+Provides core storm spout for consuming data from a stream in Amazon Kinesis Streams. It stores the sequence numbers that can be committed in zookeeper and 
+starts consuming records after that sequence number on restart by default. Below is the code sample to create a sample topology that uses the spout. Each 
+object used in configuring the spout is explained below. Ideally, the number of spout tasks should be equal to number of shards in kinesis. However each task 
+can read from more than one shard.
+
+```java
+public class KinesisSpoutTopology {
+    public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
+        String topologyName = args[0];
+        RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
+        KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
+                1000);
+        ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
+        KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
+                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
+        KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout("spout", kinesisSpout, 3);
+        topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
+        Config topologyConfig = new Config();
+        topologyConfig.setDebug(true);
+        topologyConfig.setNumWorkers(3);
+        StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
+    }
+}
+```
+As you can see above the spout takes an object of KinesisConfig in its constructor. The constructor of KinesisConfig takes 8 objects as explained below.
+
+#### `String` streamName
+name of kinesis stream to consume data from
+
+#### `ShardIteratorType` shardIteratorType
+3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. By default this argument is ignored if state for shards 
+is found in zookeeper. Hence they will apply the first time a topology is started. If you want to use any of these in subsequent runs of the topology, you 
+will need to clear the state of zookeeper node used for storing sequence numbers
+
+#### `RecordToTupleMapper` recordToTupleMapper
+an implementation of `RecordToTupleMapper` interface that spout will call to convert a kinesis record to a storm tuple. It has two methods. getOutputFields 
+tells the spout the fields that will be present in the tuple emitted from the getTuple method. If getTuple returns null, the record will be acked
+```java
+    Fields getOutputFields ();
+    List<Object> getTuple (Record record);
+```
+
+#### `Date` timestamp
+used in conjunction with the AT_TIMESTAMP shardIteratorType argument. This will make the spout fetch records from kinesis starting at that time or later. The
+time used by kinesis is the server side time associated to the record by kinesis
+
+#### `FailedMessageRetryHadnler` failedMessageRetryHandler 
+an implementation of the `FailedMessageRetryHandler` interface. By default this module provides an implementation that supports a exponential backoff retry
+mechanism for failed messages. That implementation has two constructors. Default no args constructor will configure first retry at 100 milliseconds and 
+subsequent retires at Math.pow(2, i-1) where i is the retry number in the range 2 to LONG.MAX_LONG. 2 represents the base for exponential function in seconds. 
+Other constructor takes retry interval in millis for first retry as first argument, base for exponential function in seconds as second argument and number of 
+retries as third argument. The methods of this interface and its working in accord with the spout is explained below
+```java
+    boolean failed (KinesisMessageId messageId);
+    KinesisMessageId getNextFailedMessageToRetry ();
+    void failedMessageEmitted (KinesisMessageId messageId);
+    void acked (KinesisMessageId messageId);
+```
+failed method will be called on every tuple that failed in the spout. It should return true if that failed message is scheduled to be retried, false otherwise.
+
+getNextFailedMessageToRetry method will be called the first thing every time a spout wants to emit a tuple. It should return a message that should be retried
+if any or null otherwise. Note that it can return null in the case it does not have any message to retry as of that moment. However, it should eventually 
+return every message for which it returned true when failed method was called for that message
+
+failedMessageEmitted will be called if spout successfully manages to get the record from kinesis and emit it. If not, the implementation should return the same 
+message when getNextFailedMessageToRetry is called again
+
+acked will be called once the failed message was re-emitted and successfully acked by the spout. If it was failed by the spout failed will be called again
+
+#### `ZkInfo` zkInfo
+an object encapsulating information for zookeeper interaction. The constructor takes zkUrl as first argument which is a comma separated string of zk host and
+port, zkNode as second that will be used as the root node for storing committed sequence numbers, session timeout as third in milliseconds, connection timeout
+as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence numbers to zookeeper, retry attempts as sixth for zk client
+connection retry attempts, retry interval as seventh in milliseconds for time to wait before retrying to connect. 
+
+#### `KinesisConnectionInfo` kinesisConnectionInfo
+an object that captures arguments for connecting to kinesis using kinesis client. It has a constructor that takes an implementation of `AWSCredentialsProvider`
+as first argument. This module provides an implementation called `CredentialsProviderChain` that allows the spout to authenticate with kinesis using one of 
+the 5 mechanisms in this order - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`, `ClasspathPropertiesFileCredentialsProvider`, 
+`InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`. It takes an object of `ClientConfiguration` as second argument for configuring the kinesis 
+client, `Regions` as third argument that sets the region to connect to on the client and recordsLimit as the fourth argument which represents the maximum number
+of records kinesis client will retrieve for every GetRecords request. This limit should be carefully chosen based on the size of the record, kinesis 
+throughput rate limits and per tuple latency in storm for the topology. Also if one task will be reading from more than one shards then that will also affect
+the choice of limit argument
+
+#### `Long` maxUncommittedRecords
+this represents the maximum number of uncommitted sequence numbers allowed per task. Once this number is reached spout will not fetch any new records from 
+kinesis. Uncommited sequence numbers are defined as the sum of all the messages for a task that have not been committed to zookeeper. This is different from 
+topology level max pending messages. For example if this value is set to 10, and the spout emitted sequence numbers from 1 to 10. Sequence number 1 is pending 
+and 2 to 10 acked. In that case the number of uncommitted sequence numbers is 10 since no sequence number in the range 1 to 10 can be committed to zk. 
+However, storm can still call next tuple on the spout because there is only 1 pending message
+ 
+### Maven dependencies
+Aws sdk version that this was tested with is 1.10.77
+
+```xml
+ <dependencies>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>${aws-java-sdk.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+        </dependency>
+ </dependencies>
+```
+
+#Future Work
+Handle merging or splitting of shards in kinesis, Trident spout implementation and metrics

http://git-wip-us.apache.org/repos/asf/storm/blob/e08b1c5b/docs/storm-mongodb.md
----------------------------------------------------------------------
diff --git a/docs/storm-mongodb.md b/docs/storm-mongodb.md
index 90994bd..133bac3 100644
--- a/docs/storm-mongodb.md
+++ b/docs/storm-mongodb.md
@@ -174,26 +174,3 @@ To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mon
         //updateBolt.withUpsert(true);
  ```
 
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- 

http://git-wip-us.apache.org/repos/asf/storm/blob/e08b1c5b/docs/storm-opentsdb.md
----------------------------------------------------------------------
diff --git a/docs/storm-opentsdb.md b/docs/storm-opentsdb.md
new file mode 100644
index 0000000..11995ce
--- /dev/null
+++ b/docs/storm-opentsdb.md
@@ -0,0 +1,52 @@
+# Storm OpenTSDB Bolt and TridentState
+  
+OpenTSDB offers a scalable and highly available storage for time series data. It consists of a
+Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to the 
+configured HBase cluster to push/query the data.
+
+Time series data point consists of:
+ - a metric name.
+ - a UNIX timestamp (seconds or milliseconds since Epoch).
+ - a value (64 bit integer or single-precision floating point value).
+ - a set of tags (key-value pairs) that describe the time series the point belongs to.
+
+Storm bolt and trident state creates the above time series data from a tuple based on the given `TupleMetricPointMapper`
+  
+This module provides core Storm and Trident bolt implementations for writing data to OpenTSDB. 
+
+Time series data points are written with at-least-once guarantee and duplicate data points should be handled as mentioned [here](http://opentsdb.net/docs/build/html/user_guide/writing.html#duplicate-data-points) in OpenTSDB. 
+
+## Examples
+
+### Core Bolt
+Below example describes the usage of core bolt which is `org.apache.storm.opentsdb.bolt.OpenTsdbBolt`
+
+```java
+
+        OpenTsdbClient.Builder builder =  OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
+        final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+        openTsdbBolt.withBatchSize(10).withFlushInterval(2000);
+        topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");
+        
+```
+
+
+### Trident State
+
+```java
+
+        final OpenTsdbStateFactory openTsdbStateFactory =
+                new OpenTsdbStateFactory(OpenTsdbClient.newBuilder(tsdbUrl),
+                        Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+        TridentTopology tridentTopology = new TridentTopology();
+        
+        final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenSpout());
+        
+        stream.peek(new Consumer() {
+            @Override
+            public void accept(TridentTuple input) {
+                LOG.info("########### Received tuple: [{}]", input);
+            }
+        }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater());
+        
+```

http://git-wip-us.apache.org/repos/asf/storm/blob/e08b1c5b/docs/storm-redis.md
----------------------------------------------------------------------
diff --git a/docs/storm-redis.md b/docs/storm-redis.md
index adbac68..87541b9 100644
--- a/docs/storm-redis.md
+++ b/docs/storm-redis.md
@@ -232,27 +232,3 @@ RedisClusterState
                                 new RedisClusterStateQuerier(lookupMapper),
                                 new Fields("columnName","columnValue"));
 ```
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * Robert Evans ([@revans2](https://github.com/revans2))
- * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))


[3/3] storm git commit: Merge branch 'STORM-2209-1.x' of https://github.com/vesense/storm into STORM-2209-1.x-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2209-1.x' of https://github.com/vesense/storm into STORM-2209-1.x-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f43dabfa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f43dabfa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f43dabfa

Branch: refs/heads/1.x-branch
Commit: f43dabfa861eca276e3838194a85db4ce2144b40
Parents: def89a2 079af2d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Dec 3 10:10:20 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Dec 3 10:10:20 2016 +0900

----------------------------------------------------------------------
 docs/index.md              |  12 ++-
 docs/storm-cassandra.md    |  23 -----
 docs/storm-druid.md        | 119 +++++++++++++++++++++++++
 docs/storm-kafka-client.md | 188 ++++++++++++++++++++++++++++++++++++++++
 docs/storm-kinesis.md      | 136 +++++++++++++++++++++++++++++
 docs/storm-mongodb.md      |  23 -----
 docs/storm-opentsdb.md     |  52 +++++++++++
 docs/storm-redis.md        |  24 -----
 8 files changed, 506 insertions(+), 71 deletions(-)
----------------------------------------------------------------------



[2/3] storm git commit: Added kubernetes integration

Posted by ka...@apache.org.
Added kubernetes integration

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/079af2d3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/079af2d3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/079af2d3

Branch: refs/heads/1.x-branch
Commit: 079af2d3acd44e3646c9722bb1ee88e249c42ec3
Parents: e08b1c5
Author: Xin Wang <be...@163.com>
Authored: Wed Nov 23 16:29:44 2016 +0800
Committer: Xin Wang <be...@163.com>
Committed: Mon Nov 28 21:21:18 2016 +0800

----------------------------------------------------------------------
 docs/index.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/079af2d3/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index b0db36b..65e496b 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -102,6 +102,7 @@ Trident is an alternative interface to Storm. It provides exactly-once processin
 * [YARN Integration](https://github.com/yahoo/storm-yarn), [YARN Integration via Slider](http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_yarn_resource_mgt/content/ref-7d103a48-7c2e-4b7b-aab5-62c739a32ee0.1.html)
 * [Mesos Integration](https://github.com/mesos/storm)
 * [Docker Integration](https://hub.docker.com/_/storm/)
+* [Kubernetes Integration](https://github.com/kubernetes/kubernetes/tree/master/examples/storm)
 
 ### Advanced