You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/12 04:16:12 UTC

[07/10] storm git commit: STORM-1970: external project examples refator

STORM-1970: external project examples refator

* resolve conflict by Jungtaek Lim (kabhwan@gmail.com)
  * resolve version mismatch and some external modules after added


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/97fe209e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/97fe209e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/97fe209e

Branch: refs/heads/1.x-branch
Commit: 97fe209ee6c7d2abe6807732e03baa8950c768ea
Parents: bc0a1b8
Author: vesense <be...@163.com>
Authored: Thu Jul 14 17:13:39 2016 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Sep 12 13:09:35 2016 +0900

----------------------------------------------------------------------
 examples/storm-elasticsearch-examples/pom.xml   |  44 ++++
 .../elasticsearch/bolt/EsIndexTopology.java     | 120 +++++++++++
 .../storm/elasticsearch/common/EsConstants.java |  22 ++
 .../storm/elasticsearch/common/EsTestUtil.java  |  75 +++++++
 .../trident/TridentEsTopology.java              | 135 +++++++++++++
 examples/storm-hbase-examples/pom.xml           |  44 ++++
 .../storm/hbase/topology/LookupWordCount.java   |  79 ++++++++
 .../hbase/topology/PersistentWordCount.java     |  91 +++++++++
 .../storm/hbase/topology/TotalWordCounter.java  |  70 +++++++
 .../storm/hbase/topology/WordCountClient.java   |  57 ++++++
 .../hbase/topology/WordCountValueMapper.java    |  70 +++++++
 .../storm/hbase/topology/WordCounter.java       |  59 ++++++
 .../apache/storm/hbase/topology/WordSpout.java  |  88 ++++++++
 .../storm/hbase/trident/PrintFunction.java      |  40 ++++
 .../storm/hbase/trident/WordCountTrident.java   | 104 ++++++++++
 examples/storm-hdfs-examples/pom.xml            |  44 ++++
 .../storm/hdfs/bolt/HdfsFileTopology.java       | 196 ++++++++++++++++++
 .../storm/hdfs/bolt/SequenceFileTopology.java   | 202 +++++++++++++++++++
 .../storm/hdfs/trident/FixedBatchSpout.java     |  97 +++++++++
 .../storm/hdfs/trident/TridentFileTopology.java |  99 +++++++++
 .../hdfs/trident/TridentSequenceTopology.java   |  96 +++++++++
 examples/storm-hive-examples/pom.xml            |  44 ++++
 .../storm/hive/bolt/BucketTestHiveTopology.java | 189 +++++++++++++++++
 .../apache/storm/hive/bolt/HiveTopology.java    | 151 ++++++++++++++
 .../hive/bolt/HiveTopologyPartitioned.java      | 153 ++++++++++++++
 .../storm/hive/trident/TridentHiveTopology.java | 199 ++++++++++++++++++
 examples/storm-jdbc-examples/pom.xml            |  44 ++++
 .../org/apache/storm/jdbc/spout/UserSpout.java  |  90 +++++++++
 .../jdbc/topology/AbstractUserTopology.java     | 115 +++++++++++
 .../jdbc/topology/UserPersistanceTopology.java  |  62 ++++++
 .../UserPersistanceTridentTopology.java         |  61 ++++++
 examples/storm-kafka-examples/pom.xml           |  44 ++++
 .../storm/kafka/TridentKafkaTopology.java       |  91 +++++++++
 examples/storm-mongodb-examples/pom.xml         |  44 ++++
 .../storm/mongodb/topology/InsertWordCount.java |  81 ++++++++
 .../storm/mongodb/topology/UpdateWordCount.java |  91 +++++++++
 .../storm/mongodb/topology/WordCounter.java     |  67 ++++++
 .../storm/mongodb/topology/WordSpout.java       |  88 ++++++++
 .../storm/mongodb/trident/WordCountTrident.java |  85 ++++++++
 examples/storm-mqtt-examples/pom.xml            | 115 +++++++++++
 .../src/main/flux/sample.yaml                   |  62 ++++++
 .../src/main/flux/ssl-sample.yaml               |  78 +++++++
 .../mqtt/examples/CustomMessageMapper.java      |  49 +++++
 .../mqtt/examples/MqttBrokerPublisher.java      | 102 ++++++++++
 .../src/main/resources/log4j2.xml               |  32 +++
 examples/storm-opentsdb-examples/pom.xml        |  44 ++++
 .../storm/opentsdb/MetricGenBatchSpout.java     |  94 +++++++++
 .../apache/storm/opentsdb/MetricGenSpout.java   |  72 +++++++
 .../opentsdb/SampleOpenTsdbBoltTopology.java    |  70 +++++++
 .../opentsdb/SampleOpenTsdbTridentTopology.java |  87 ++++++++
 examples/storm-redis-examples/pom.xml           |  44 ++++
 .../storm/redis/topology/LookupWordCount.java   | 166 +++++++++++++++
 .../redis/topology/PersistentWordCount.java     | 116 +++++++++++
 .../redis/topology/WhitelistWordCount.java      | 155 ++++++++++++++
 .../storm/redis/topology/WordCounter.java       |  67 ++++++
 .../apache/storm/redis/topology/WordSpout.java  |  88 ++++++++
 .../storm/redis/trident/PrintFunction.java      |  40 ++++
 .../redis/trident/WordCountLookupMapper.java    |  57 ++++++
 .../redis/trident/WordCountStoreMapper.java     |  39 ++++
 .../redis/trident/WordCountTridentRedis.java    |  98 +++++++++
 .../trident/WordCountTridentRedisCluster.java   | 106 ++++++++++
 .../WordCountTridentRedisClusterMap.java        | 101 ++++++++++
 .../redis/trident/WordCountTridentRedisMap.java |  94 +++++++++
 examples/storm-solr-examples/pom.xml            |  44 ++++
 .../storm/solr/spout/SolrFieldsSpout.java       |  76 +++++++
 .../apache/storm/solr/spout/SolrJsonSpout.java  | 116 +++++++++++
 .../storm/solr/topology/SolrFieldsTopology.java |  56 +++++
 .../storm/solr/topology/SolrJsonTopology.java   |  48 +++++
 .../storm/solr/topology/SolrTopology.java       |  82 ++++++++
 .../solr/trident/SolrFieldsTridentTopology.java |  45 +++++
 .../solr/trident/SolrJsonTridentTopology.java   |  45 +++++
 .../org/apache/storm/solr/util/TestUtil.java    |  30 +++
 .../elasticsearch/bolt/EsIndexTopology.java     | 120 -----------
 .../trident/TridentEsTopology.java              | 135 -------------
 .../storm/hbase/topology/LookupWordCount.java   |  79 --------
 .../hbase/topology/PersistentWordCount.java     |  91 ---------
 .../storm/hbase/topology/TotalWordCounter.java  |  70 -------
 .../storm/hbase/topology/WordCountClient.java   |  57 ------
 .../hbase/topology/WordCountValueMapper.java    |  70 -------
 .../storm/hbase/topology/WordCounter.java       |  59 ------
 .../apache/storm/hbase/topology/WordSpout.java  |  88 --------
 .../storm/hbase/trident/PrintFunction.java      |  40 ----
 .../storm/hbase/trident/WordCountTrident.java   | 104 ----------
 .../storm/hdfs/bolt/HdfsFileTopology.java       | 196 ------------------
 .../storm/hdfs/bolt/SequenceFileTopology.java   | 202 -------------------
 .../storm/hdfs/trident/FixedBatchSpout.java     |  97 ---------
 .../storm/hdfs/trident/TridentFileTopology.java |  99 ---------
 .../hdfs/trident/TridentSequenceTopology.java   |  96 ---------
 .../storm/hive/bolt/BucketTestHiveTopology.java | 190 -----------------
 .../apache/storm/hive/bolt/HiveTopology.java    | 151 --------------
 .../hive/bolt/HiveTopologyPartitioned.java      | 153 --------------
 .../storm/hive/trident/TridentHiveTopology.java | 199 ------------------
 .../org/apache/storm/jdbc/spout/UserSpout.java  |  90 ---------
 .../jdbc/topology/AbstractUserTopology.java     | 115 -----------
 .../jdbc/topology/UserPersistanceTopology.java  |  62 ------
 .../UserPersistanceTridentTopology.java         |  61 ------
 .../storm/kafka/TridentKafkaTopology.java       |  91 ---------
 .../storm/mongodb/topology/InsertWordCount.java |  81 --------
 .../storm/mongodb/topology/UpdateWordCount.java |  91 ---------
 .../storm/mongodb/topology/WordCounter.java     |  67 ------
 .../storm/mongodb/topology/WordSpout.java       |  88 --------
 .../storm/mongodb/trident/WordCountTrident.java |  85 --------
 external/storm-mqtt/examples/pom.xml            | 115 -----------
 .../examples/src/main/flux/sample.yaml          |  62 ------
 .../examples/src/main/flux/ssl-sample.yaml      |  78 -------
 .../mqtt/examples/CustomMessageMapper.java      |  49 -----
 .../mqtt/examples/MqttBrokerPublisher.java      | 102 ----------
 .../examples/src/main/resources/log4j2.xml      |  32 ---
 external/storm-mqtt/pom.xml                     |   1 -
 .../storm/opentsdb/MetricGenBatchSpout.java     |  94 ---------
 .../apache/storm/opentsdb/MetricGenSpout.java   |  72 -------
 .../opentsdb/SampleOpenTsdbBoltTopology.java    |  70 -------
 .../opentsdb/SampleOpenTsdbTridentTopology.java |  87 --------
 .../storm/redis/topology/LookupWordCount.java   | 166 ---------------
 .../redis/topology/PersistentWordCount.java     | 116 -----------
 .../redis/topology/WhitelistWordCount.java      | 155 --------------
 .../storm/redis/topology/WordCounter.java       |  67 ------
 .../apache/storm/redis/topology/WordSpout.java  |  88 --------
 .../storm/redis/trident/PrintFunction.java      |  40 ----
 .../redis/trident/WordCountLookupMapper.java    |  57 ------
 .../redis/trident/WordCountStoreMapper.java     |  39 ----
 .../redis/trident/WordCountTridentRedis.java    |  98 ---------
 .../trident/WordCountTridentRedisCluster.java   | 106 ----------
 .../WordCountTridentRedisClusterMap.java        | 101 ----------
 .../redis/trident/WordCountTridentRedisMap.java |  94 ---------
 .../storm/solr/spout/SolrFieldsSpout.java       |  76 -------
 .../apache/storm/solr/spout/SolrJsonSpout.java  | 120 -----------
 .../storm/solr/topology/SolrFieldsTopology.java |  56 -----
 .../storm/solr/topology/SolrJsonTopology.java   |  48 -----
 .../storm/solr/topology/SolrTopology.java       |  82 --------
 .../solr/trident/SolrFieldsTridentTopology.java |  45 -----
 .../solr/trident/SolrJsonTridentTopology.java   |  45 -----
 .../org/apache/storm/solr/util/TestUtil.java    |  30 ---
 pom.xml                                         |  23 ++-
 134 files changed, 6068 insertions(+), 5522 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-elasticsearch-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/pom.xml b/examples/storm-elasticsearch-examples/pom.xml
new file mode 100644
index 0000000..eceb196
--- /dev/null
+++ b/examples/storm-elasticsearch-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-elasticsearch-examples</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-elasticsearch</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
new file mode 100644
index 0000000..d30424b
--- /dev/null
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsConstants;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class EsIndexTopology {
+
+    static final String SPOUT_ID = "spout";
+    static final String BOLT_ID = "bolt";
+    static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+        config.setNumWorkers(1);
+        TopologyBuilder builder = new TopologyBuilder();
+        UserDataSpout spout = new UserDataSpout();
+        builder.setSpout(SPOUT_ID, spout, 1);
+        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+        EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
+        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
+
+        EsTestUtil.startEsNode();
+        EsTestUtil.waitForSeconds(5);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+        EsTestUtil.waitForSeconds(20);
+        cluster.killTopology(TOPOLOGY_NAME);
+        System.out.println("cluster begin to shutdown");
+        cluster.shutdown();
+        System.out.println("cluster shutdown");
+        System.exit(0);
+    }
+
+    public static class UserDataSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sources = {
+                "{\"user\":\"user1\"}",
+                "{\"user\":\"user2\"}",
+                "{\"user\":\"user3\"}",
+                "{\"user\":\"user4\"}"
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+        private String indexName = "index1";
+        private String typeName = "type1";
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("source", "index", "type", "id"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            String source = sources[index];
+            UUID msgId = UUID.randomUUID();
+            Values values = new Values(source, indexName, typeName, msgId);
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sources.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if (count > 1000) {
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+            Thread.yield();
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsConstants.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsConstants.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsConstants.java
new file mode 100644
index 0000000..98bb71d
--- /dev/null
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsConstants.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+public class EsConstants {
+    public static String clusterName = "test-cluster";
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
new file mode 100644
index 0000000..cb1c745
--- /dev/null
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+
+import java.util.HashMap;
+
+public class EsTestUtil {
+    public static Tuple generateTestTuple(String source, String index, String type, String id) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+                new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+            @Override
+            public Fields getComponentOutputFields(String componentId, String streamId) {
+                return new Fields("source", "index", "type", "id");
+            }
+        };
+        return new TupleImpl(topologyContext, new Values(source, index, type, id), 1, "");
+    }
+
+    public static EsTupleMapper generateDefaultTupleMapper() {
+        return new DefaultEsTupleMapper();
+    }
+
+    public static Node startEsNode(){
+        Node node = NodeBuilder.nodeBuilder().data(true).settings(
+                ImmutableSettings.builder()
+                        .put(ClusterName.SETTING, EsConstants.clusterName)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+                        .put(EsExecutors.PROCESSORS, 1)
+                        .put("http.enabled", false)
+                        .put("index.percolator.map_unmapped_fields_as_string", true)
+                        .put("index.store.type", "memory")
+        ).build();
+        node.start();
+        return node;
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
new file mode 100644
index 0000000..67eab5b
--- /dev/null
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsConstants;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.*;
+
+public class TridentEsTopology {
+
+    static final String TOPOLOGY_NAME = "elasticsearch-test-topology2";
+
+    public static void main(String[] args) {
+        int batchSize = 100;
+        FixedBatchSpout spout = new FixedBatchSpout(batchSize);
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout", spout);
+        EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
+        Fields esFields = new Fields("index", "type", "source");
+        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+        StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
+        TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+
+        EsTestUtil.startEsNode();
+        EsTestUtil.waitForSeconds(5);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(TOPOLOGY_NAME, null, topology.build());
+        EsTestUtil.waitForSeconds(20);
+        cluster.killTopology(TOPOLOGY_NAME);
+        System.out.println("cluster begin to shutdown");
+        cluster.shutdown();
+        System.out.println("cluster shutdown");
+        System.exit(0);
+    }
+
+    public static class FixedBatchSpout implements IBatchSpout {
+        int maxBatchSize;
+        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+        private Values[] outputs = {
+                new Values("{\"user\":\"user1\"}", "index1", "type1", UUID.randomUUID().toString()),
+                new Values("{\"user\":\"user2\"}", "index1", "type2", UUID.randomUUID().toString()),
+                new Values("{\"user\":\"user3\"}", "index2", "type1", UUID.randomUUID().toString()),
+                new Values("{\"user\":\"user4\"}", "index2", "type2", UUID.randomUUID().toString())
+        };
+        private int index = 0;
+        boolean cycle = false;
+
+        public FixedBatchSpout(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+        }
+
+        public void setCycle(boolean cycle) {
+            this.cycle = cycle;
+        }
+
+        @Override
+        public Fields getOutputFields() {
+            return new Fields("source", "index", "type", "id");
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context) {
+            index = 0;
+        }
+
+        @Override
+        public void emitBatch(long batchId, TridentCollector collector) {
+            List<List<Object>> batch = this.batches.get(batchId);
+            if (batch == null) {
+                batch = new ArrayList<List<Object>>();
+                if (index >= outputs.length && cycle) {
+                    index = 0;
+                }
+                for (int i = 0; i < maxBatchSize; index++, i++) {
+                    if (index == outputs.length) {
+                        index = 0;
+                    }
+                    batch.add(outputs[index]);
+                }
+                this.batches.put(batchId, batch);
+            }
+            for (List<Object> list : batch) {
+                collector.emit(list);
+            }
+        }
+
+        @Override
+        public void ack(long batchId) {
+            this.batches.remove(batchId);
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            Config conf = new Config();
+            conf.setMaxTaskParallelism(1);
+            return conf;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/pom.xml b/examples/storm-hbase-examples/pom.xml
new file mode 100644
index 0000000..ac5faaf
--- /dev/null
+++ b/examples/storm-hbase-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-hbase-examples</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hbase</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java
new file mode 100644
index 0000000..43f72ae
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.hbase.bolt.HBaseLookupBolt;
+import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
+import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class LookupWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+    private static final String TOTAL_COUNT_BOLT = "TOTAL_COUNT_BOLT";
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        Map<String, Object> hbConf = new HashMap<String, Object>();
+        if(args.length > 0){
+            hbConf.put("hbase.rootdir", args[0]);
+        }
+        config.put("hbase.conf", hbConf);
+
+        WordSpout spout = new WordSpout();
+        TotalWordCounter totalBolt = new TotalWordCounter();
+
+        SimpleHBaseMapper mapper = new SimpleHBaseMapper().withRowKeyField("word");
+        HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria();
+        projectionCriteria.addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"));
+
+        WordCountValueMapper rowToTupleMapper = new WordCountValueMapper();
+
+        HBaseLookupBolt hBaseLookupBolt = new HBaseLookupBolt("WordCount", mapper, rowToTupleMapper)
+                .withConfigKey("hbase.conf")
+                .withProjectionCriteria(projectionCriteria);
+
+        //wordspout -> lookupbolt -> totalCountBolt
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(LOOKUP_BOLT, hBaseLookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 1).fieldsGrouping(LOOKUP_BOLT, new Fields("columnName"));
+
+        if (args.length == 1) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 2) {
+            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+        } else{
+            System.out.println("Usage: LookupWordCount <hbase.rootdir>");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java
new file mode 100644
index 0000000..cfb94d0
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.hbase.bolt.HBaseBolt;
+import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
+import org.apache.storm.hbase.security.HBaseSecurityUtil;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class PersistentWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String HBASE_BOLT = "HBASE_BOLT";
+
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        Map<String, Object> hbConf = new HashMap<String, Object>();
+        if(args.length > 0){
+            hbConf.put("hbase.rootdir", args[0]);
+        }
+        config.put("hbase.conf", hbConf);
+
+        WordSpout spout = new WordSpout();
+        WordCounter bolt = new WordCounter();
+
+        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+                .withRowKeyField("word")
+                .withColumnFields(new Fields("word"))
+                .withCounterFields(new Fields("count"))
+                .withColumnFamily("cf");
+
+        HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
+                .withConfigKey("hbase.conf");
+
+
+        // wordSpout ==> countBolt ==> HBaseBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+        if (args.length == 1) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 2) {
+            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+        } else if (args.length == 4) {
+            System.out.println("hdfs url: " + args[0] + ", keytab file: " + args[2] + 
+                ", principal name: " + args[3] + ", toplogy name: " + args[1]);
+            hbConf.put(HBaseSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);
+            hbConf.put(HBaseSecurityUtil.STORM_USER_NAME_KEY, args[3]);
+            config.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: PersistentWordCount <hbase.rootdir> [topology name] [keytab file] [principal name]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java
new file mode 100644
index 0000000..61b0dd8
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class TotalWordCounter implements IBasicBolt {
+
+    private BigInteger total = BigInteger.ZERO;
+    private static final Logger LOG = LoggerFactory.getLogger(TotalWordCounter.class);
+    private static final Random RANDOM = new Random();
+    @SuppressWarnings("rawtypes")
+    public void prepare(Map stormConf, TopologyContext context) {
+    }
+
+    /*
+     * Just output the word value with a count of 1.
+     * The HBaseBolt will handle incrementing the counter.
+     */
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        total = total.add(new BigInteger(input.getValues().get(1).toString()));
+        collector.emit(tuple(total.toString()));
+        //prints the total with low probability.
+        if(RANDOM.nextInt(1000) > 995) {
+            LOG.info("Running total = " + total);
+        }
+    }
+
+    public void cleanup() {
+        LOG.info("Final total = " + total);
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("total"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
new file mode 100644
index 0000000..33ce450
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Connects to the 'WordCount' table and prints counts for each word.
+ *
+ * Assumes you have run (or are running) <code>PersistentWordCount</code>
+ */
+public class WordCountClient {
+
+    public static void main(String[] args) throws Exception {
+        Configuration config = HBaseConfiguration.create();
+        if(args.length > 0){
+            config.set("hbase.rootdir", args[0]);
+        }
+
+        HTable table = new HTable(config, "WordCount");
+
+
+        for (String word : WordSpout.words) {
+            Get get = new Get(Bytes.toBytes(word));
+            Result result = table.get(get);
+
+            byte[] countBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("count"));
+            byte[] wordBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("word"));
+
+            String wordStr = Bytes.toString(wordBytes);
+            System.out.println(wordStr);
+            long count = Bytes.toLong(countBytes);
+            System.out.println("Word: '" + wordStr + "', Count: " + count);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountValueMapper.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
new file mode 100644
index 0000000..6c3301b
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Takes a Hbase result and returns a value list that has a value instance for each column and corresponding value.
+ * So if the result from Hbase was
+ * <pre>
+ * WORD, COUNT
+ * apple, 10
+ * bannana, 20
+ * </pre>
+ *
+ * this will return
+ * <pre>
+ *     [WORD, apple]
+ *     [COUNT, 10]
+ *     [WORD, banana]
+ *     [COUNT, 20]
+ * </pre>
+ *
+ */
+public class WordCountValueMapper implements HBaseValueMapper {
+
+    @Override
+    public List<Values> toValues(ITuple tuple, Result result) throws Exception {
+        List<Values> values = new ArrayList<Values>();
+        Cell[] cells = result.rawCells();
+        for(Cell cell : cells) {
+            Values value = new Values (Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toLong(CellUtil.cloneValue(cell)));
+            values.add(value);
+        }
+        return values;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("columnName","columnValue"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java
new file mode 100644
index 0000000..3a350a8
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class WordCounter implements IBasicBolt {
+
+
+    @SuppressWarnings("rawtypes")
+    public void prepare(Map stormConf, TopologyContext context) {
+    }
+
+    /*
+     * Just output the word value with a count of 1.
+     * The HBaseBolt will handle incrementing the counter.
+     */
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        collector.emit(tuple(input.getValues().get(0), 1));
+    }
+
+    public void cleanup() {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "count"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java
new file mode 100644
index 0000000..c5fc490
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+public class WordSpout implements IRichSpout {
+    boolean isDistributed;
+    SpoutOutputCollector collector;
+    public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" };
+
+    public WordSpout() {
+        this(true);
+    }
+
+    public WordSpout(boolean isDistributed) {
+        this.isDistributed = isDistributed;
+    }
+
+    public boolean isDistributed() {
+        return this.isDistributed;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+    }
+
+    public void close() {
+
+    }
+
+    public void nextTuple() {
+        final Random rand = new Random();
+        final String word = words[rand.nextInt(words.length)];
+        this.collector.emit(new Values(word), UUID.randomUUID());
+        Thread.yield();
+    }
+
+    public void ack(Object msgId) {
+
+    }
+
+    public void fail(Object msgId) {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word"));
+    }
+
+    @Override
+    public void activate() {
+    }
+
+    @Override
+    public void deactivate() {
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/PrintFunction.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/PrintFunction.java
new file mode 100644
index 0000000..cdc7690
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/PrintFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.Random;
+
+public class PrintFunction extends BaseFunction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class);
+
+    private static final Random RANDOM = new Random();
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
+        if(RANDOM.nextInt(1000) > 995) {
+            LOG.info(tuple.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java
new file mode 100644
index 0000000..b2f0ce8
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
+import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper;
+import org.apache.storm.hbase.topology.WordCountValueMapper;
+import org.apache.storm.hbase.trident.mapper.SimpleTridentHBaseMapper;
+import org.apache.storm.hbase.trident.mapper.TridentHBaseMapper;
+import org.apache.storm.hbase.trident.state.HBaseQuery;
+import org.apache.storm.hbase.trident.state.HBaseState;
+import org.apache.storm.hbase.trident.state.HBaseStateFactory;
+import org.apache.storm.hbase.trident.state.HBaseUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+public class WordCountTrident {
+    public static StormTopology buildTopology(String hbaseRoot){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()
+                .withColumnFamily("cf")
+                .withColumnFields(new Fields("word"))
+                .withCounterFields(new Fields("count"))
+                .withRowKeyField("word");
+
+        HBaseValueMapper rowToStormValueMapper = new WordCountValueMapper();
+
+        HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria();
+        projectionCriteria.addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"));
+
+        HBaseState.Options options = new HBaseState.Options()
+                .withConfigKey(hbaseRoot)
+                .withDurability(Durability.SYNC_WAL)
+                .withMapper(tridentHBaseMapper)
+                .withProjectionCriteria(projectionCriteria)
+                .withRowToStormValueMapper(rowToStormValueMapper)
+                .withTableName("WordCount");
+
+        StateFactory factory = new HBaseStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,  new HBaseUpdater(), new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"), new HBaseQuery(), new Fields("columnName","columnValue"));
+        stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (args.length == 1) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("wordCounter", conf, buildTopology(args[0]));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("wordCounter");
+            cluster.shutdown();
+            System.exit(0);
+        }
+        else if(args.length == 2) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[1], conf, buildTopology(args[0]));
+        } else{
+            System.out.println("Usage: TridentFileTopology <hdfs url> [topology name]");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hdfs-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/pom.xml b/examples/storm-hdfs-examples/pom.xml
new file mode 100644
index 0000000..0214ce1
--- /dev/null
+++ b/examples/storm-hdfs-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-hdfs-examples</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hdfs</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
new file mode 100644
index 0000000..b1ae542
--- /dev/null
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
+import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.MoveFileAction;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HdfsFileTopology {
+    static final String SENTENCE_SPOUT_ID = "sentence-spout";
+    static final String BOLT_ID = "my-bolt";
+    static final String TOPOLOGY_NAME = "test-topology";
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+        config.setNumWorkers(1);
+
+        SentenceSpout spout = new SentenceSpout();
+
+        // sync the filesystem after every 1k tuples
+        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+        // rotate files when they reach 5MB
+        FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES);
+
+        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+                .withPath("/tmp/foo/")
+                .withExtension(".txt");
+
+        // use "|" instead of "," for field delimiter
+        RecordFormat format = new DelimitedRecordFormat()
+                .withFieldDelimiter("|");
+
+        Yaml yaml = new Yaml();
+        InputStream in = new FileInputStream(args[1]);
+        Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
+        in.close();
+        config.put("hdfs.config", yamlConf);
+
+        HdfsBolt bolt = new HdfsBolt()
+                .withConfigKey("hdfs.config")
+                .withFsUrl(args[0])
+                .withFileNameFormat(fileNameFormat)
+                .withRecordFormat(format)
+                .withRotationPolicy(rotationPolicy)
+                .withSyncPolicy(syncPolicy)
+                .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/"));
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
+        // SentenceSpout --> MyBolt
+        builder.setBolt(BOLT_ID, bolt, 4)
+                .shuffleGrouping(SENTENCE_SPOUT_ID);
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+
+            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+            waitForSeconds(120);
+            cluster.killTopology(TOPOLOGY_NAME);
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 3) {
+            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+        } else{
+            System.out.println("Usage: HdfsFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
+        }
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static class SentenceSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sentences = {
+                "my dog has fleas",
+                "i like cold beverages",
+                "the dog ate my homework",
+                "don't have a cow man",
+                "i don't think i like fleas"
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("sentence", "timestamp"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            Values values = new Values(sentences[index], System.currentTimeMillis());
+            UUID msgId = UUID.randomUUID();
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sentences.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if(count > 20000){
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+            Thread.yield();
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+
+    public static class MyBolt extends BaseRichBolt {
+
+        private HashMap<String, Long> counts = null;
+        private OutputCollector collector;
+
+        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+            this.counts = new HashMap<String, Long>();
+            this.collector = collector;
+        }
+
+        public void execute(Tuple tuple) {
+            collector.ack(tuple);
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            // this bolt does not emit anything
+        }
+
+        @Override
+        public void cleanup() {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
new file mode 100644
index 0000000..86bc698
--- /dev/null
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.hdfs.bolt.format.*;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.MoveFileAction;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SequenceFileTopology {
+    static final String SENTENCE_SPOUT_ID = "sentence-spout";
+    static final String BOLT_ID = "my-bolt";
+    static final String TOPOLOGY_NAME = "test-topology";
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+        config.setNumWorkers(1);
+
+        SentenceSpout spout = new SentenceSpout();
+
+        // sync the filesystem after every 1k tuples
+        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+        // rotate files when they reach 5MB
+        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+
+        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+                .withPath("/tmp/source/")
+                .withExtension(".seq");
+
+        // create sequence format instance.
+        DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence");
+
+        Yaml yaml = new Yaml();
+        InputStream in = new FileInputStream(args[1]);
+        Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
+        in.close();
+        config.put("hdfs.config", yamlConf);
+
+        SequenceFileBolt bolt = new SequenceFileBolt()
+                .withFsUrl(args[0])
+                .withConfigKey("hdfs.config")
+                .withFileNameFormat(fileNameFormat)
+                .withSequenceFormat(format)
+                .withRotationPolicy(rotationPolicy)
+                .withSyncPolicy(syncPolicy)
+                .withCompressionType(SequenceFile.CompressionType.RECORD)
+                .withCompressionCodec("deflate")
+                .addRotationAction(new MoveFileAction().toDestination("/tmp/dest/"));
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
+        // SentenceSpout --> MyBolt
+        builder.setBolt(BOLT_ID, bolt, 4)
+                .shuffleGrouping(SENTENCE_SPOUT_ID);
+
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+
+            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+            waitForSeconds(120);
+            cluster.killTopology(TOPOLOGY_NAME);
+            cluster.shutdown();
+            System.exit(0);
+        } else if(args.length == 3) {
+            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+        } else{
+            System.out.println("Usage: SequenceFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
+        }
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+
+    public static class SentenceSpout extends BaseRichSpout {
+
+
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sentences = {
+                "my dog has fleas",
+                "i like cold beverages",
+                "the dog ate my homework",
+                "don't have a cow man",
+                "i don't think i like fleas"
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("sentence", "timestamp"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            Values values = new Values(sentences[index], System.currentTimeMillis());
+            UUID msgId = UUID.randomUUID();
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sentences.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if(count > 20000){
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+            Thread.yield();
+        }
+
+        public void ack(Object msgId) {
+//            System.out.println("ACK");
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+
+
+    public static class MyBolt extends BaseRichBolt {
+
+        private HashMap<String, Long> counts = null;
+        private OutputCollector collector;
+
+        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+            this.counts = new HashMap<String, Long>();
+            this.collector = collector;
+        }
+
+        public void execute(Tuple tuple) {
+            collector.ack(tuple);
+        }
+
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            // this bolt does not emit anything
+        }
+
+        @Override
+        public void cleanup() {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
new file mode 100644
index 0000000..76cc2aa
--- /dev/null
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FixedBatchSpout implements IBatchSpout {
+
+    Fields fields;
+    List<Object>[] outputs;
+    int maxBatchSize;
+    HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+
+    public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
+        this.fields = fields;
+        this.outputs = outputs;
+        this.maxBatchSize = maxBatchSize;
+    }
+
+    int index = 0;
+    boolean cycle = false;
+
+    public void setCycle(boolean cycle) {
+        this.cycle = cycle;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context) {
+        index = 0;
+    }
+
+    @Override
+    public void emitBatch(long batchId, TridentCollector collector) {
+        List<List<Object>> batch = this.batches.get(batchId);
+        if(batch == null){
+            batch = new ArrayList<List<Object>>();
+            if(index>=outputs.length && cycle) {
+                index = 0;
+            }
+            for(int i=0; i < maxBatchSize; index++, i++) {
+                if(index == outputs.length){
+                    index=0;
+                }
+                batch.add(outputs[index]);
+            }
+            this.batches.put(batchId, batch);
+        }
+        for(List<Object> list : batch){
+            collector.emit(list);
+        }
+    }
+
+    @Override
+    public void ack(long batchId) {
+        this.batches.remove(batchId);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(1);
+        return conf;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return fields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
new file mode 100644
index 0000000..8f75c45
--- /dev/null
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.hdfs.common.rotation.MoveFileAction;
+import org.apache.storm.hdfs.trident.format.*;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Map;
+
+public class TridentFileTopology {
+
+    public static StormTopology buildTopology(String hdfsUrl){
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", "key"), 1000, new Values("the cow jumped over the moon", 1l),
+                new Values("the man went to the store and bought some candy", 2l), new Values("four score and seven years ago", 3l),
+                new Values("how many apples can you eat", 4l), new Values("to be or not to be the person", 5l));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        Fields hdfsFields = new Fields("sentence", "key");
+
+        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+                .withPath("/tmp/trident")
+                .withPrefix("trident")
+                .withExtension(".txt");
+
+        RecordFormat recordFormat = new DelimitedRecordFormat()
+                .withFields(hdfsFields);
+
+        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
+
+        HdfsState.Options options = new HdfsState.HdfsFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withRecordFormat(recordFormat)
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl(hdfsUrl)
+                .withConfigKey("hdfs.config");
+
+        StateFactory factory = new HdfsStateFactory().withOptions(options);
+
+        TridentState state = stream
+                .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
+
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+
+        Yaml yaml = new Yaml();
+        InputStream in = new FileInputStream(args[1]);
+        Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
+        in.close();
+        conf.put("hdfs.config", yamlConf);
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("wordCounter", conf, buildTopology(args[0]));
+            Thread.sleep(120 * 1000);
+        } else if(args.length == 3) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0]));
+        } else{
+            System.out.println("Usage: TridentFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
+        }
+    }
+}