You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/14 06:19:26 UTC

[1/2] storm git commit: Fix documentation code formatting

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 07c68d5a6 -> 04dc84608


Fix documentation code formatting

Code blocks should always follow an empty line; otherwise, jekyll will
fail to properly format the code block.

Also, github's fenced code blocks (with triple backticks) in the middle
of item lists cause incorrect list numbering.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6f79c1b1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6f79c1b1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6f79c1b1

Branch: refs/heads/1.x-branch
Commit: 6f79c1b1f9415eb002b57abd69838ee9b78d1e19
Parents: 07c68d5
Author: R�gis Behmo <re...@behmo.com>
Authored: Thu Apr 13 16:17:47 2017 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Apr 14 15:16:50 2017 +0900

----------------------------------------------------------------------
 docs/State-checkpointing.md  |  81 ++++++------
 docs/Trident-API-Overview.md |   6 +-
 docs/storm-cassandra.md      |  21 ++--
 docs/storm-jdbc.md           |   7 +-
 docs/storm-kafka-client.md   |  15 ++-
 docs/storm-kafka.md          | 258 ++++++++++++++++++++------------------
 docs/storm-kinesis.md        |   3 +
 docs/storm-redis.md          |  61 +++++++--
 docs/storm-solr.md           |  18 ++-
 9 files changed, 272 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6f79c1b1/docs/State-checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/State-checkpointing.md b/docs/State-checkpointing.md
index 3d6212c..8789508 100644
--- a/docs/State-checkpointing.md
+++ b/docs/State-checkpointing.md
@@ -22,31 +22,30 @@ For example a word count bolt could use the key value state abstraction for the
 last committed by the framework during the previous run.
 3. In the execute method, update the word count.
 
- ```java
- public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, Long>> {
- private KeyValueState<String, Long> wordCounts;
- private OutputCollector collector;
- ...
-     @Override
-     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-       this.collector = collector;
-     }
-     @Override
-     public void initState(KeyValueState<String, Long> state) {
-       wordCounts = state;
-     }
-     @Override
-     public void execute(Tuple tuple) {
-       String word = tuple.getString(0);
-       Integer count = wordCounts.get(word, 0);
-       count++;
-       wordCounts.put(word, count);
-       collector.emit(tuple, new Values(word, count));
-       collector.ack(tuple);
-     }
- ...
- }
- ```
+    public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, Long>> {
+    private KeyValueState<String, Long> wordCounts;
+    private OutputCollector collector;
+    ...
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+          this.collector = collector;
+        }
+        @Override
+        public void initState(KeyValueState<String, Long> state) {
+          wordCounts = state;
+        }
+        @Override
+        public void execute(Tuple tuple) {
+          String word = tuple.getString(0);
+          Integer count = wordCounts.get(word, 0);
+          count++;
+          wordCounts.put(word, count);
+          collector.emit(tuple, new Values(word, count));
+          collector.ack(tuple);
+        }
+    ...
+    }
+
 4. The framework periodically checkpoints the state of the bolt (default every second). The frequency
 can be changed by setting the storm config `topology.state.checkpoint.interval.ms`
 5. For state persistence, use a state provider that supports persistence by setting the `topology.state.provider` in the
@@ -56,21 +55,21 @@ in the extlib directory.
 6. The state provider properties can be overridden by setting `topology.state.provider.config`. For Redis state this is a
 json config with the following properties.
 
- ```
- {
-   "keyClass": "Optional fully qualified class name of the Key type.",
-   "valueClass": "Optional fully qualified class name of the Value type.",
-   "keySerializerClass": "Optional Key serializer implementation class.",
-   "valueSerializerClass": "Optional Value Serializer implementation class.",
-   "jedisPoolConfig": {
-     "host": "localhost",
-     "port": 6379,
-     "timeout": 2000,
-     "database": 0,
-     "password": "xyz"
-     }
- }
- ```
+```
+{
+  "keyClass": "Optional fully qualified class name of the Key type.",
+  "valueClass": "Optional fully qualified class name of the Value type.",
+  "keySerializerClass": "Optional Key serializer implementation class.",
+  "valueSerializerClass": "Optional Value Serializer implementation class.",
+  "jedisPoolConfig": {
+    "host": "localhost",
+    "port": 6379,
+    "timeout": 2000,
+    "database": 0,
+    "password": "xyz"
+    }
+}
+```
 
 ## Checkpoint mechanism
 Checkpoint is triggered by an internal checkpoint spout at the specified `topology.state.checkpoint.interval.ms`. If there is
@@ -123,6 +122,7 @@ In order to provide the at-least once guarantee, all bolts in a stateful topolog
 
 ### IStateful bolt hooks
 IStateful bolt interface provides hook methods where in the stateful bolts could implement some custom actions.
+
 ```java
     /**
      * This is a hook for the component to perform some actions just before the
@@ -142,6 +142,7 @@ IStateful bolt interface provides hook methods where in the stateful bolts could
      */
     void preRollback();
 ```
+
 This is optional and stateful bolts are not expected to provide any implementation. This is provided so that other
 system level components can be built on top of the stateful abstractions where we might want to take some actions before the
 stateful bolt's state is prepared, committed or rolled back.

http://git-wip-us.apache.org/repos/asf/storm/blob/6f79c1b1/docs/Trident-API-Overview.md
----------------------------------------------------------------------
diff --git a/docs/Trident-API-Overview.md b/docs/Trident-API-Overview.md
index 309127d..ef743db 100644
--- a/docs/Trident-API-Overview.md
+++ b/docs/Trident-API-Overview.md
@@ -161,6 +161,7 @@ mystream.flatMap(new Split(), new Fields("word"))
  This could be useful for debugging to see the tuples as they flow past a certain point in a pipeline.
 
 For example, the below code would print the result of converting the words to uppercase before they are passed to `groupBy`
+
 ```java
  mystream.flatMap(new Split()).map(new UpperCase())
          .peek(new Consumer() {
@@ -206,14 +207,14 @@ Partition 2:
 [74,  37]
 [51,  49]
 [37,  98]
-
 ```
 
 `minBy` operation can be applied on the above stream of tuples like below which results in emitting tuples with minimum values of `count` field in each partition.
 
-``` java
+```java
   mystream.minBy(new Fields("count"))
 ```
+
 Result of the above code on mentioned partitions is:
  
 ```
@@ -227,7 +228,6 @@ Partition 1:
 
 Partition 2:
 [82,  23]
-
 ```
 
 You can look at other `min` and `minBy` operations on Stream

http://git-wip-us.apache.org/repos/asf/storm/blob/6f79c1b1/docs/storm-cassandra.md
----------------------------------------------------------------------
diff --git a/docs/storm-cassandra.md b/docs/storm-cassandra.md
index 47fabbd..61beb10 100644
--- a/docs/storm-cassandra.md
+++ b/docs/storm-cassandra.md
@@ -29,14 +29,14 @@ The following properties may be passed to storm configuration.
 ### CassandraWriterBolt
 
 ####Static import
-```java
 
+```java
 import static org.apache.storm.cassandra.DynamicStatementBuilder.*
-
 ```
 
 #### Insert Query Builder
 ##### Insert query including only the specified tuple fields.
+
 ```java
 
     new CassandraWriterBolt(
@@ -50,6 +50,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 ```
 
 ##### Insert query including all tuple fields.
+
 ```java
 
     new CassandraWriterBolt(
@@ -61,8 +62,8 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 ```
 
 ##### Insert multiple queries from one input tuple.
-```java
 
+```java
     new CassandraWriterBolt(
         async(
             simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
@@ -72,8 +73,8 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 ```
 
 ##### Insert query using QueryBuilder
-```java
 
+```java
     new CassandraWriterBolt(
         async(
             simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
@@ -83,8 +84,8 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 ```
 
 ##### Insert query with static bound query
-```java
 
+```java
     new CassandraWriterBolt(
          async(
             boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
@@ -94,8 +95,8 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 ```
 
 ##### Insert query with static bound query using named setters and aliases
-```java
 
+```java
     new CassandraWriterBolt(
          async(
             boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);")
@@ -111,24 +112,24 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 ```
 
 ##### Insert query with bound statement load from storm configuration
-```java
 
+```java
     new CassandraWriterBolt(
          boundQuery(named("insertIntoAlbum"))
             .bind(all());
 ```
 
 ##### Insert query with bound statement load from tuple field
-```java
 
+```java
     new CassandraWriterBolt(
          boundQuery(namedByField("cql"))
             .bind(all());
 ```
 
 ##### Insert query with batch statement
-```java
 
+```java
     // Logged
     new CassandraWriterBolt(loggedBatch(
             simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
@@ -205,6 +206,7 @@ builder.setBolt("BOLT_WRITER", bolt, 4)
 
 ### Trident API support
 storm-cassandra support Trident `state` API for `inserting` data into Cassandra. 
+
 ```java
         CassandraState.Options options = new CassandraState.Options(new CassandraContext());
         CQLStatementTupleMapper insertTemperatureValues = boundQuery(
@@ -219,6 +221,7 @@ storm-cassandra support Trident `state` API for `inserting` data into Cassandra.
 ```
 
 Below `state` API for `querying` data from Cassandra.
+
 ```java
         CassandraState.Options options = new CassandraState.Options(new CassandraContext());
         CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")

http://git-wip-us.apache.org/repos/asf/storm/blob/6f79c1b1/docs/storm-jdbc.md
----------------------------------------------------------------------
diff --git a/docs/storm-jdbc.md b/docs/storm-jdbc.md
index 15aa2a3..66bedde 100644
--- a/docs/storm-jdbc.md
+++ b/docs/storm-jdbc.md
@@ -63,7 +63,7 @@ You can optionally specify a query timeout seconds param that specifies max seco
 The default is set to value of topology.message.timeout.secs and a value of -1 will indicate not to set any query timeout.
 You should set the query timeout value to be <= topology.message.timeout.secs.
 
- ```java
+```java
 Map hikariConfigMap = Maps.newHashMap();
 hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
 hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
@@ -81,7 +81,7 @@ JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simp
 JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                                     .withInsertQuery("insert into user values (?,?)")
                                     .withQueryTimeoutSecs(30);                                    
- ```
+```
 
 ### SimpleJdbcMapper
 `storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
@@ -112,6 +112,7 @@ method will return the columns in the order in which Jdbc connection instance's
 
 **If you specified your own insert query to `JdbcInsertBolt` you must initialize `SimpleJdbcMapper` with explicit columnschema such that the schema has columns in the same order as your insert queries.**
 For example if your insert query is `Insert into user (user_id, user_name) values (?,?)` then your `SimpleJdbcMapper` should be initialized with the following statements:
+
 ```java
 List<Column> columnSchema = Lists.newArrayList(
     new Column("user_id", java.sql.Types.INTEGER),
@@ -224,6 +225,7 @@ A runnable example can be found in the `src/test/java/topology` directory.
 * Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
 * The test topologies executes the following queries so your intended DB must support these queries for test topologies
 to work. 
+
 ```SQL
 create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
 create table if not exists department (dept_id integer, dept_name varchar(100));
@@ -272,6 +274,7 @@ You can generate a single jar with dependencies using mvn assembly plugin. To us
 ```
 
 Mysql Example:
+
 ```
 storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
 ```

http://git-wip-us.apache.org/repos/asf/storm/blob/6f79c1b1/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index d1776d2..c687261 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -35,11 +35,13 @@ These should be specified while constructing an instance of FieldNameBasedTupleT
 
 ###KafkaTopicSelector and trident KafkaTopicSelector
 This interface has only one method
+
 ```java
 public interface KafkaTopicSelector {
     String getTopics(Tuple/TridentTuple tuple);
 }
 ```
+
 The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published
 You can return a null and the message will be ignored. If you have one static topic name then you can use
 DefaultTopicSelector.java and set the name of the topic in the constructor.
@@ -55,6 +57,7 @@ These are also defined in `org.apache.kafka.clients.producer.ProducerConfig`
 
 ###Using wildcard kafka topic match
 You can do a wildcard topic match by adding the following config
+
 ```
      Config config = new Config();
      config.put("kafka.topic.wildcard.match",true);
@@ -67,6 +70,7 @@ After this you can specify a wildcard topic for matching e.g. clickstream.*.log.
 ###Putting it all together
 
 For the bolt :
+
 ```java
         TopologyBuilder builder = new TopologyBuilder();
 
@@ -174,30 +178,30 @@ The API is written with java 8 lambda expressions in mind.  It works with java7
 
 #### Create a Simple Insecure Spout
 The following will consume all events published to "topic" and send them to MyBolt with the fields "topic", "partition", "offset", "key", "value".
+
 ```java
 
 final TopologyBuilder tp = new TopologyBuilder();
 tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic").build()), 1);
 tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
 ...
-
 ```
 
 #### Wildcard Topics
 Wildcard topics will consume from all topics that exist in the specified brokers list and match the pattern.  So in the following example
 "topic", "topic_foo" and "topic_bar" will all match the pattern "topic.*", but "not_my_topic" would not match. 
+
 ```java
 
 final TopologyBuilder tp = new TopologyBuilder();
 tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()), 1);
 tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
 ...
-
 ```
 
 #### Multiple Streams
+
 This uses java 8 lambda expressions.
-```java
 
 final TopologyBuilder tp = new TopologyBuilder();
 
@@ -212,7 +216,6 @@ tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:
 tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout", "STREAM_1");
 tp.setBolt("another", new myOtherBolt()).shuffleGrouping("kafka_spout", "STREAM_2");
 ...
-
 ```
 
 #### Trident
@@ -223,7 +226,6 @@ final Stream spoutStream = tridentTopology.newStream("kafkaSpout",
     new KafkaTridentSpoutOpaque<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()))
       .parallelismHint(1)
 ...
-
 ```
 
 Trident does not support multiple streams and will ignore any streams set for output.  If however the Fields are not identical for each
@@ -239,6 +241,7 @@ specific stream.  To do this you will need to return an instance of `org.apache.
 specific stream the tuple should go to.
 
 For Example:
+
 ```java
 return new KafkaTuple(1, 2, 3, 4).routedTo("bar");
 ```
@@ -261,6 +264,7 @@ please be careful when using these or implementing your own.
 ## Use the Maven Shade Plugin to Build the Uber Jar
 
 Add the following to `REPO_HOME/storm/external/storm-kafka-client/pom.xml`
+
 ```xml
 <plugin>
     <groupId>org.apache.maven.plugins</groupId>
@@ -363,6 +367,7 @@ To enable it, you need to:
 * enable *AutoCommitMode* in Kafka consumer configuration; 
 
 Here's one example to set AutoCommitMode in KafkaSpout:
+
 ```java
 KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
 		.builder(String bootstrapServers, String ... topics)

http://git-wip-us.apache.org/repos/asf/storm/blob/6f79c1b1/docs/storm-kafka.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka.md b/docs/storm-kafka.md
index c69845d..d521d13 100644
--- a/docs/storm-kafka.md
+++ b/docs/storm-kafka.md
@@ -17,10 +17,12 @@ Currently, we support the following two implementations:
 ####ZkHosts
 ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses
 Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling
+
 ```java
-    public ZkHosts(String brokerZkStr, String brokerZkPath)
-    public ZkHosts(String brokerZkStr)
+public ZkHosts(String brokerZkStr, String brokerZkPath)
+public ZkHosts(String brokerZkStr)
 ```
+
 Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and
 partition information is stored. By default this is /brokers which is what the default Kafka implementation uses.
 
@@ -32,21 +34,22 @@ This is an alternative implementation where broker -> partition information is s
 of this class, you need to first construct an instance of GlobalPartitionInformation.
 
 ```java
-    Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
-    Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
-    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
-    GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
-    partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
-    partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
-    partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
-    StaticHosts hosts = new StaticHosts(partitionInfo);
+Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
+Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
+Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
+GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
+partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
+partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
+partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
+StaticHosts hosts = new StaticHosts(partitionInfo);
 ```
 
 ###KafkaConfig
 The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig.
+
 ```java
-    public KafkaConfig(BrokerHosts hosts, String topic)
-    public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
+public KafkaConfig(BrokerHosts hosts, String topic)
+public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
 ```
 
 The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic.
@@ -57,47 +60,51 @@ There are 2 extensions of KafkaConfig currently in use.
 Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling
 behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely
 identify your spout.
+
 ```java
 public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
 public SpoutConfig(BrokerHosts hosts, String topic, String id);
 ```
+
 In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:
-```java
-    // setting for how often to save the current Kafka offset to ZooKeeper
-    public long stateUpdateIntervalMs = 2000;
-
-    // Retry strategy for failed messages
-    public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
-
-    // Exponential back-off retry settings.  These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
-    // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
-    // Initial delay between successive retries
-    public long retryInitialDelayMs = 0;
-    public double retryDelayMultiplier = 1.0;
-    
-    // Maximum delay between successive retries    
-    public long retryDelayMaxMs = 60 * 1000;
-    // Failed message will be retried infinitely if retryLimit is less than zero. 
-    public int retryLimit = -1;     
 
+```java
+// setting for how often to save the current Kafka offset to ZooKeeper
+public long stateUpdateIntervalMs = 2000;
+
+// Retry strategy for failed messages
+public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+// Exponential back-off retry settings.  These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+// calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
+// Initial delay between successive retries
+public long retryInitialDelayMs = 0;
+public double retryDelayMultiplier = 1.0;
+
+// Maximum delay between successive retries    
+public long retryDelayMaxMs = 60 * 1000;
+// Failed message will be retried infinitely if retryLimit is less than zero. 
+public int retryLimit = -1;     
 ```
+
 Core KafkaSpout only accepts an instance of SpoutConfig.
 
 TridentKafkaConfig is another extension of KafkaConfig.
 TridentKafkaEmitter only accepts TridentKafkaConfig.
 
 The KafkaConfig class also has bunch of public variables that controls your application's behavior. Here are defaults:
+
 ```java
-    public int fetchSizeBytes = 1024 * 1024;
-    public int socketTimeoutMs = 10000;
-    public int fetchMaxWait = 10000;
-    public int bufferSizeBytes = 1024 * 1024;
-    public MultiScheme scheme = new RawMultiScheme();
-    public boolean ignoreZkOffsets = false;
-    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
-    public long maxOffsetBehind = Long.MAX_VALUE;
-    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
-    public int metricsTimeBucketSizeInSecs = 60;
+public int fetchSizeBytes = 1024 * 1024;
+public int socketTimeoutMs = 10000;
+public int fetchMaxWait = 10000;
+public int bufferSizeBytes = 1024 * 1024;
+public MultiScheme scheme = new RawMultiScheme();
+public boolean ignoreZkOffsets = false;
+public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+public long maxOffsetBehind = Long.MAX_VALUE;
+public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
+public int metricsTimeBucketSizeInSecs = 60;
 ```
 
 Most of them are self explanatory except MultiScheme.
@@ -106,8 +113,8 @@ MultiScheme is an interface that dictates how the ByteBuffer consumed from Kafka
 also controls the naming of your output field.
 
 ```java
-  public Iterable<List<Object>> deserialize(ByteBuffer ser);
-  public Fields getOutputFields();
+public Iterable<List<Object>> deserialize(ByteBuffer ser);
+public Fields getOutputFields();
 ```
 
 The default `RawMultiScheme` just takes the `ByteBuffer` and returns a tuple with the ByteBuffer converted to a `byte[]`. The name of the outputField is "bytes". There are alternative implementations like `SchemeAsMultiScheme` and `KeyValueSchemeAsMultiScheme` which can convert the `ByteBuffer` to `String`.
@@ -117,7 +124,6 @@ which has an additional deserialize method that accepts the message `ByteBuffer`
 
 ```java
 public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset)
-
 ```
 
 This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message.
@@ -183,6 +189,7 @@ KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
 ```
 
 #### Trident Spout
+
 ```java
 TridentTopology topology = new TridentTopology();
 BrokerHosts zk = new ZkHosts("localhost");
@@ -227,21 +234,21 @@ When building a project with storm-kafka, you must explicitly add the Kafka depe
 use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your `pom.xml`:
 
 ```xml
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
-            <version>0.8.1.1</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
+<dependency>
+    <groupId>org.apache.kafka</groupId>
+    <artifactId>kafka_2.10</artifactId>
+    <version>0.8.1.1</version>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
 ```
 
 Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.
@@ -249,11 +256,12 @@ Note that the ZooKeeper and log4j dependencies are excluded to prevent version c
 You can also override the kafka dependency version while building from maven, with parameter `storm.kafka.version` and `storm.kafka.artifact.id`
 e.g. `mvn clean install -Dstorm.kafka.artifact.id=kafka_2.11 -Dstorm.kafka.version=0.9.0.1`
 
-When selecting a kafka dependency version, you should ensure -
- 1. kafka api is compatible with storm-kafka. Currently, only 0.9.x and 0.8.x client API is supported by storm-kafka
- module. If you want to use a higher version, storm-kafka-client module should be used instead.
- 2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with
- 0.8.x broker.
+When selecting a kafka dependency version, you should ensure - 
+
+1. kafka api is compatible with storm-kafka. Currently, only 0.9.x and 0.8.x client API is supported by storm-kafka 
+module. If you want to use a higher version, storm-kafka-client module should be used instead.
+2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with 
+0.8.x broker. 
 
 
 ##Writing to Kafka as part of your topology
@@ -267,8 +275,8 @@ You need to provide implementation of following 2 interfaces
 These interfaces have 2 methods defined:
 
 ```java
-    K getKeyFromTuple(Tuple/TridentTuple tuple);
-    V getMessageFromTuple(Tuple/TridentTuple tuple);
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
 ```
 
 As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field
@@ -281,6 +289,7 @@ These should be specified while constructing and instance of FieldNameBasedTuple
 
 ###KafkaTopicSelector and trident KafkaTopicSelector
 This interface has only one method
+
 ```java
 public interface KafkaTopicSelector {
     String getTopics(Tuple/TridentTuple tuple);
@@ -300,10 +309,10 @@ Section "Important configuration properties for the producer" for more details.
 
 ###Using wildcard kafka topic match
 You can do a wildcard topic match by adding the following config
-```
-     Config config = new Config();
-     config.put("kafka.topic.wildcard.match",true);
 
+```java
+Config config = new Config();
+config.put("kafka.topic.wildcard.match",true);
 ```
 
 After this you can specify a wildcard topic for matching e.g. clickstream.*.log.  This will match all streams matching clickstream.my.log, clickstream.cart.log etc
@@ -312,64 +321,69 @@ After this you can specify a wildcard topic for matching e.g. clickstream.*.log.
 ###Putting it all together
 
 For the bolt :
+
 ```java
-        TopologyBuilder builder = new TopologyBuilder();
-
-        Fields fields = new Fields("key", "message");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                    new Values("storm", "1"),
-                    new Values("trident", "1"),
-                    new Values("needs", "1"),
-                    new Values("javadoc", "1")
-        );
-        spout.setCycle(true);
-        builder.setSpout("spout", spout, 5);
-        //set producer properties.
-        Properties props = new Properties();
-        props.put("bootstrap.servers", "localhost:9092");
-        props.put("acks", "1");
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
-        KafkaBolt bolt = new KafkaBolt()
-                .withProducerProperties(props)
-                .withTopicSelector(new DefaultTopicSelector("test"))
-                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
-        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
-
-        Config conf = new Config();
-
-        StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
+TopologyBuilder builder = new TopologyBuilder();
+
+Fields fields = new Fields("key", "message");
+FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+            new Values("storm", "1"),
+            new Values("trident", "1"),
+            new Values("needs", "1"),
+            new Values("javadoc", "1")
+);
+spout.setCycle(true);
+builder.setSpout("spout", spout, 5);
+//set producer properties.
+Properties props = new Properties();
+props.put("bootstrap.servers", "localhost:9092");
+props.put("acks", "1");
+props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+KafkaBolt bolt = new KafkaBolt()
+        .withProducerProperties(props)
+        .withTopicSelector(new DefaultTopicSelector("test"))
+        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
+builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
+
+Config conf = new Config();
+
+StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
 ```
 
 For Trident:
 
 ```java
-        Fields fields = new Fields("word", "count");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                new Values("storm", "1"),
-                new Values("trident", "1"),
-                new Values("needs", "1"),
-                new Values("javadoc", "1")
-        );
-        spout.setCycle(true);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        //set producer properties.
-        Properties props = new Properties();
-        props.put("bootstrap.servers", "localhost:9092");
-        props.put("acks", "1");
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
-        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
-                .withProducerProperties(props)
-                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
-                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
-        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
-
-        Config conf = new Config();
-        StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
-```
\ No newline at end of file
+Fields fields = new Fields("word", "count");
+FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+        new Values("storm", "1"),
+        new Values("trident", "1"),
+        new Values("needs", "1"),
+        new Values("javadoc", "1")
+);
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+Stream stream = topology.newStream("spout1", spout);
+
+//set producer properties.
+Properties props = new Properties();
+props.put("bootstrap.servers", "localhost:9092");
+props.put("acks", "1");
+props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+        .withProducerProperties(props)
+        .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+        .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
+stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
+
+Config conf = new Config();
+StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
+```
+
+## Committer Sponsors
+
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/6f79c1b1/docs/storm-kinesis.md
----------------------------------------------------------------------
diff --git a/docs/storm-kinesis.md b/docs/storm-kinesis.md
index b23c10d..69c52bb 100644
--- a/docs/storm-kinesis.md
+++ b/docs/storm-kinesis.md
@@ -25,6 +25,7 @@ public class KinesisSpoutTopology {
     }
 }
 ```
+
 As you can see above the spout takes an object of KinesisConfig in its constructor. The constructor of KinesisConfig takes 8 objects as explained below.
 
 #### `String` streamName
@@ -38,6 +39,7 @@ will need to clear the state of zookeeper node used for storing sequence numbers
 #### `RecordToTupleMapper` recordToTupleMapper
 an implementation of `RecordToTupleMapper` interface that spout will call to convert a kinesis record to a storm tuple. It has two methods. getOutputFields 
 tells the spout the fields that will be present in the tuple emitted from the getTuple method. If getTuple returns null, the record will be acked
+
 ```java
     Fields getOutputFields ();
     List<Object> getTuple (Record record);
@@ -53,6 +55,7 @@ mechanism for failed messages. That implementation has two constructors. Default
 subsequent retires at Math.pow(2, i-1) where i is the retry number in the range 2 to LONG.MAX_LONG. 2 represents the base for exponential function in seconds. 
 Other constructor takes retry interval in millis for first retry as first argument, base for exponential function in seconds as second argument and number of 
 retries as third argument. The methods of this interface and its working in accord with the spout is explained below
+
 ```java
     boolean failed (KinesisMessageId messageId);
     KinesisMessageId getNextFailedMessageToRetry ();

http://git-wip-us.apache.org/repos/asf/storm/blob/6f79c1b1/docs/storm-redis.md
----------------------------------------------------------------------
diff --git a/docs/storm-redis.md b/docs/storm-redis.md
index 87541b9..1226e9b 100644
--- a/docs/storm-redis.md
+++ b/docs/storm-redis.md
@@ -25,18 +25,20 @@ use it as a maven dependency:
 
 ### For normal Bolt
 
-Storm-redis provides basic Bolt implementations, ```RedisLookupBolt``` and ```RedisStoreBolt```.
+Storm-redis provides basic Bolt implementations, `RedisLookupBolt` and `RedisStoreBolt`, and `RedisFilterBolt`.
 
-As name represents its usage, ```RedisLookupBolt``` retrieves value from Redis using key, and ```RedisStoreBolt``` stores key / value to Redis. One tuple will be matched to one key / value pair, and you can define match pattern to ```TupleMapper```.
+As name represents its usage, `RedisLookupBolt` retrieves value from Redis using key, and `RedisStoreBolt` stores key / value to Redis, and `RedisFilterBolt` filters out tuple which key or field doesn't exist on Redis.
 
-You can also choose data type from ```RedisDataTypeDescription``` to use. Please refer ```RedisDataTypeDescription.RedisDataType``` to see what data types are supported. In some data types (hash and sorted set), it requires additional key and converted key from tuple becomes element.
+One tuple will be matched to one key / value pair, and you can define match pattern to `TupleMapper`.
 
-These interfaces are combined with ```RedisLookupMapper``` and ```RedisStoreMapper``` which fit ```RedisLookupBolt``` and ```RedisStoreBolt``` respectively.
+You can also choose data type from `RedisDataTypeDescription` to use. Please refer `RedisDataTypeDescription.RedisDataType` to see what data types are supported. In some data types (hash and sorted set, and set if only RedisFilterBolt), it requires additional key and converted key from tuple becomes element.
+
+These interfaces are combined with `RedisLookupMapper` and `RedisStoreMapper` and `RedisFilterMapper` which fit `RedisLookupBolt` and `RedisStoreBolt`, and `RedisFilterBolt` respectively.
+(When you want to implement RedisFilterMapper, be sure to set declareOutputFields() to declare same fields to input stream, since FilterBolt forwards input tuples when they exist on Redis.)   
 
 #### RedisLookupBolt example
 
 ```java
-
 class WordCountRedisLookupMapper implements RedisLookupMapper {
     private RedisDataTypeDescription description;
     private final String hashKey = "wordCount";
@@ -74,21 +76,59 @@ class WordCountRedisLookupMapper implements RedisLookupMapper {
         return null;
     }
 }
-
 ```
 
 ```java
-
 JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
         .setHost(host).setPort(port).build();
 RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
 RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
 ```
 
-#### RedisStoreBolt example
+#### RedisFilterBolt example
 
 ```java
+class BlacklistWordFilterMapper implements RedisFilterMapper {
+    private RedisDataTypeDescription description;
+    private final String setKey = "blacklist";
+
+    public BlacklistWordFilterMapper() {
+        description = new RedisDataTypeDescription(
+                RedisDataTypeDescription.RedisDataType.SET, setKey);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "count"));
+    }
 
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+        return description;
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return tuple.getStringByField("word");
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+        return null;
+    }
+}
+```
+
+```java
+JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+        .setHost(host).setPort(port).build();
+RedisFilterMapper filterMapper = new BlacklistWordFilterMapper();
+RedisFilterBolt filterBolt = new RedisFilterBolt(poolConfig, filterMapper);
+```
+
+#### RedisStoreBolt example
+
+```java
 class WordCountStoreMapper implements RedisStoreMapper {
     private RedisDataTypeDescription description;
     private final String hashKey = "wordCount";
@@ -116,7 +156,6 @@ class WordCountStoreMapper implements RedisStoreMapper {
 ```
 
 ```java
-
 JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                 .setHost(host).setPort(port).build();
 RedisStoreMapper storeMapper = new WordCountStoreMapper();
@@ -128,7 +167,6 @@ RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
 If your scenario doesn't fit ```RedisStoreBolt``` and ```RedisLookupBolt```, storm-redis also provides ```AbstractRedisBolt``` to let you extend and apply your business logic.
 
 ```java
-
     public static class LookupWordTotalCountBolt extends AbstractRedisBolt {
         private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
         private static final Random RANDOM = new Random();
@@ -174,7 +212,6 @@ If your scenario doesn't fit ```RedisStoreBolt``` and ```RedisLookupBolt```, sto
             declarer.declare(new Fields("wordName", "count"));
         }
     }
-
 ```
 
 ### Trident State usage
@@ -184,6 +221,7 @@ If your scenario doesn't fit ```RedisStoreBolt``` and ```RedisLookupBolt```, sto
 2. RedisClusterState and RedisClusterMapState, which provide JedisCluster interface, just for redis cluster.
 
 RedisState
+
 ```java
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
@@ -207,6 +245,7 @@ RedisState
 ```
 
 RedisClusterState
+
 ```java
         Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
         for (String hostPort : redisHostPort.split(",")) {

http://git-wip-us.apache.org/repos/asf/storm/blob/6f79c1b1/docs/storm-solr.md
----------------------------------------------------------------------
diff --git a/docs/storm-solr.md b/docs/storm-solr.md
index 4f5106d..7a0cb50 100644
--- a/docs/storm-solr.md
+++ b/docs/storm-solr.md
@@ -31,6 +31,7 @@ describe in detail the two key components of the Storm Solr integration, the `So
 ```
 
 ## Trident Topology With Fields Mapper
+
 ```java
     new SolrStateFactory(solrConfig, solrMapper);
     
@@ -73,7 +74,8 @@ be a String with the contents in JSON format, or a Java object that will be seri
  
 Code snippet illustrating how to create a `SolrJsonMapper` object to update the `gettingstarted` Solr collection with JSON content 
 declared in the tuple field with name "JSON"
-``` java
+
+```java
     SolrMapper solrMapper = new SolrJsonMapper.Builder("gettingstarted", "JSON").build();
 ```
 
@@ -96,7 +98,7 @@ Code snippet illustrating how to create a `SolrFieldsMapper` object to update th
 field separates each value with the token % instead of the default | . To use the default token you can ommit the call to the method
 `setMultiValueFieldToken`.
 
-``` java
+```java
     new SolrFieldsMapper.Builder(
             new RestJsonSchemaBuilder("localhost", "8983", "gettingstarted"), "gettingstarted")
                 .setMultiValueFieldToken("%").build();
@@ -115,8 +117,8 @@ and then generate an uber jar with all the dependencies.
 
  Add the following to `REPO_HOME/storm/external/storm-solr/pom.xml`
  
- ```
- <plugin>
+```xml
+<plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>2.4.1</version>
@@ -140,11 +142,15 @@ and then generate an uber jar with all the dependencies.
 
 create the uber jar by running the commmand:
 
-`mvn package -f REPO_HOME/storm/external/storm-solr/pom.xml`
+```
+mvn package -f REPO_HOME/storm/external/storm-solr/pom.xml
+```
 
 This will create the uber jar file with the name and location matching the following pattern:
  
-`REPO_HOME/storm/external/storm/target/storm-solr-0.11.0-SNAPSHOT.jar`
+```
+REPO_HOME/storm/external/storm/target/storm-solr-0.11.0-SNAPSHOT.jar
+```
 
 ## Run Examples
 Copy the file `REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT.jar` to `STORM_HOME/extlib`


[2/2] storm git commit: Merge branch 'pr-2068-1.x-merge' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'pr-2068-1.x-merge' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/04dc8460
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/04dc8460
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/04dc8460

Branch: refs/heads/1.x-branch
Commit: 04dc846080708cc202a06c51875a51bff4552369
Parents: 07c68d5 6f79c1b
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Apr 14 15:16:57 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Apr 14 15:16:57 2017 +0900

----------------------------------------------------------------------
 docs/State-checkpointing.md  |  81 ++++++------
 docs/Trident-API-Overview.md |   6 +-
 docs/storm-cassandra.md      |  21 ++--
 docs/storm-jdbc.md           |   7 +-
 docs/storm-kafka-client.md   |  15 ++-
 docs/storm-kafka.md          | 258 ++++++++++++++++++++------------------
 docs/storm-kinesis.md        |   3 +
 docs/storm-redis.md          |  61 +++++++--
 docs/storm-solr.md           |  18 ++-
 9 files changed, 272 insertions(+), 198 deletions(-)
----------------------------------------------------------------------