You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/12 04:16:10 UTC

[05/10] storm git commit: STORM-1970: external project examples refator

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-mqtt-examples/src/main/flux/sample.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/src/main/flux/sample.yaml b/examples/storm-mqtt-examples/src/main/flux/sample.yaml
new file mode 100644
index 0000000..c2902dc
--- /dev/null
+++ b/examples/storm-mqtt-examples/src/main/flux/sample.yaml
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "mqtt-topology"
+
+components:
+   ########## MQTT Spout Config ############
+  - id: "mqtt-type"
+    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+  - id: "mqtt-options"
+    className: "org.apache.storm.mqtt.common.MqttOptions"
+    properties:
+      - name: "url"
+        value: "tcp://localhost:1883"
+      - name: "topics"
+        value:
+          - "/users/tgoetz/#"
+
+# topology configuration
+config:
+  topology.workers: 1
+  topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+  - id: "mqtt-spout"
+    className: "org.apache.storm.mqtt.spout.MqttSpout"
+    constructorArgs:
+      - ref: "mqtt-type"
+      - ref: "mqtt-options"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+
+streams:
+  - from: "mqtt-spout"
+    to: "log"
+    grouping:
+      type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml b/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml
new file mode 100644
index 0000000..bfb668d
--- /dev/null
+++ b/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "mqtt-topology"
+
+components:
+   ########## MQTT Spout Config ############
+  - id: "mqtt-type"
+    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+  - id: "keystore-loader"
+    className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
+    constructorArgs:
+      - "keystore.jks"
+      - "truststore.jks"
+    properties:
+      - name: "keyPassword"
+        value: "password"
+      - name: "keyStorePassword"
+        value: "password"
+      - name: "trustStorePassword"
+        value: "password"
+
+  - id: "mqtt-options"
+    className: "org.apache.storm.mqtt.common.MqttOptions"
+    properties:
+      - name: "url"
+        value: "ssl://raspberrypi.local:8883"
+      - name: "topics"
+        value:
+          - "/users/tgoetz/#"
+
+# topology configuration
+config:
+  topology.workers: 1
+  topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+  - id: "mqtt-spout"
+    className: "org.apache.storm.mqtt.spout.MqttSpout"
+    constructorArgs:
+      - ref: "mqtt-type"
+      - ref: "mqtt-options"
+      - ref: "keystore-loader"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+
+streams:
+
+  - from: "mqtt-spout"
+    to: "log"
+    grouping:
+      type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
new file mode 100644
index 0000000..ec5645c
--- /dev/null
+++ b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.examples;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.MqttMessageMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Given a topic name: "users/{user}/{location}/{deviceId}"
+ * and a payload of "{temperature}/{humidity}"
+ * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float)
+ *
+ */
+public class CustomMessageMapper implements MqttMessageMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class);
+
+
+    public Values toValues(MqttMessage message) {
+        String topic = message.getTopic();
+        String[] topicElements = topic.split("/");
+        String[] payloadElements = new String(message.getMessage()).split("/");
+
+        return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]),
+                Float.parseFloat(payloadElements[1]));
+    }
+
+    public Fields outputFields() {
+        return new Fields("user", "deviceId", "location", "temperature", "humidity");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java
new file mode 100644
index 0000000..fa8389d
--- /dev/null
+++ b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.examples;
+
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.storm.mqtt.MqttLogger;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class MqttBrokerPublisher {
+    private static final Logger LOG = LoggerFactory.getLogger(MqttBrokerPublisher.class);
+
+    private static BrokerService broker;
+
+    private static BlockingConnection connection;
+
+
+    public static void startBroker() throws Exception {
+        LOG.info("Starting broker...");
+        broker = new BrokerService();
+        broker.addConnector("mqtt://localhost:1883");
+        broker.setDataDirectory("target");
+        broker.start();
+        LOG.info("MQTT broker started");
+        Runtime.getRuntime().addShutdownHook(new Thread(){
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Shutting down MQTT broker...");
+                    broker.stop();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+
+    public static void startPublisher() throws Exception {
+        MQTT client = new MQTT();
+        client.setTracer(new MqttLogger());
+        client.setHost("tcp://localhost:1883");
+        client.setClientId("MqttBrokerPublisher");
+        connection = client.blockingConnection();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(){
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Shutting down MQTT client...");
+                    connection.disconnect();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        connection.connect();
+    }
+
+    public static void publish() throws Exception {
+        String topic = "/users/tgoetz/office/1234";
+        Random rand = new Random();
+        LOG.info("Publishing to topic {}", topic);
+        LOG.info("Cntrl+C to exit.");
+
+        while(true) {
+            int temp = rand.nextInt(100);
+            int hum = rand.nextInt(100);
+            String payload = temp + "/" + hum;
+
+            connection.publish(topic, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+            Thread.sleep(500);
+        }
+    }
+
+    public static void main(String[] args) throws Exception{
+        startBroker();
+        startPublisher();
+        publish();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-mqtt-examples/src/main/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/src/main/resources/log4j2.xml b/examples/storm-mqtt-examples/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..bfe57a1
--- /dev/null
+++ b/examples/storm-mqtt-examples/src/main/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+        </Console>
+    </Appenders>
+
+    <Loggers>
+        <Logger name="org.apache.storm.flux.wrappers" level="INFO"/>
+        <Logger name="org.apache.storm.mqtt" level="DEBUG"/>
+        <Root level="error">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-opentsdb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/pom.xml b/examples/storm-opentsdb-examples/pom.xml
new file mode 100644
index 0000000..057efae
--- /dev/null
+++ b/examples/storm-opentsdb-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-opentsdb-examples</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-opentsdb</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java
new file mode 100644
index 0000000..b0580f6
--- /dev/null
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * BatchSpout implementation for metrics generation.
+ */
+public class MetricGenBatchSpout implements IBatchSpout {
+
+    private int batchSize;
+    private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+    public MetricGenBatchSpout(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context) {
+
+    }
+
+    @Override
+    public void emitBatch(long batchId, TridentCollector collector) {
+        List<List<Object>> values;
+        if(batches.containsKey(batchId)) {
+            values = batches.get(batchId);
+        } else {
+            values = new ArrayList<>();
+            for (int i = 0; i < batchSize; i++) {
+                // tuple values are mapped with
+                // metric, timestamp, value, Map of tagK/tagV respectively.
+                values.add(Lists.newArrayList(Lists.newArrayList("device.temp", System.currentTimeMillis(), new Random().nextLong(),
+                        Collections.singletonMap("loc.id", new Random().nextInt() % 64 + ""))));
+            }
+            batches.put(batchId, values);
+        }
+        for (List<Object> value : values) {
+            collector.emit(value);
+        }
+
+    }
+
+    @Override
+    public void ack(long batchId) {
+        batches.remove(batchId);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(1);
+        return conf;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return MetricGenSpout.DEFAULT_METRIC_FIELDS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java
new file mode 100644
index 0000000..21af196
--- /dev/null
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Spout to generate tuples containing metric data.
+ */
+public class MetricGenSpout extends BaseRichSpout {
+
+    public static final Fields DEFAULT_METRIC_FIELDS =
+            new Fields(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getMetricField(),
+                    TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getTimestampField(),
+                    TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getValueField(),
+                    TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getTagsField());
+
+    private Map conf;
+    private TopologyContext context;
+    private SpoutOutputCollector collector;
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(DEFAULT_METRIC_FIELDS);
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.conf = conf;
+        this.context = context;
+        this.collector = collector;
+    }
+
+    @Override
+    public void nextTuple() {
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        // tuple values are mapped with
+        // metric, timestamp, value, Map of tagK/tagV respectively.
+        collector.emit(Lists.newArrayList("device.temp", System.currentTimeMillis(), new Random().nextLong(),
+                Collections.singletonMap("loc.id", new Random().nextInt() % 64 + "")));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
new file mode 100644
index 0000000..6c511b8
--- /dev/null
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import org.apache.storm.opentsdb.bolt.OpenTsdbBolt;
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.opentsdb.client.OpenTsdbClient;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+
+import java.util.Collections;
+
+/**
+ * Sample application to use OpenTSDB bolt.
+ */
+public class SampleOpenTsdbBoltTopology {
+
+    public static void main(String[] args) throws Exception {
+        if(args.length == 0) {
+            throw new IllegalArgumentException("There should be at least one argument. Run as `SampleOpenTsdbBoltTopology <tsdb-url>`");
+        }
+
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+        topologyBuilder.setSpout("metric-gen", new MetricGenSpout(), 5);
+
+        String openTsdbUrl = args[0];
+        OpenTsdbClient.Builder builder =  OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
+        final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+        openTsdbBolt.withBatchSize(10).withFlushInterval(2).failTupleForFailedMetrics();
+        topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");
+
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        if (args.length > 1) {
+            conf.setNumWorkers(3);
+
+            StormSubmitter.submitTopologyWithProgressBar(args[1], conf, topologyBuilder.createTopology());
+        } else {
+            conf.setMaxTaskParallelism(3);
+
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("word-count", conf, topologyBuilder.createTopology());
+
+            Thread.sleep(30000);
+
+            cluster.shutdown();
+            System.exit(0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
new file mode 100644
index 0000000..db51a8a
--- /dev/null
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.opentsdb.client.OpenTsdbClient;
+import org.apache.storm.opentsdb.trident.OpenTsdbStateFactory;
+import org.apache.storm.opentsdb.trident.OpenTsdbStateUpdater;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.Consumer;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * Sample trident topology to store time series metrics in to OpenTsdb.
+ */
+public class SampleOpenTsdbTridentTopology {
+    private static final Logger LOG = LoggerFactory.getLogger(SampleOpenTsdbTridentTopology.class);
+
+    public static void main(String[] args) throws Exception {
+        if(args.length == 0) {
+            throw new IllegalArgumentException("There should be at least one argument. Run as `SampleOpenTsdbTridentTopology <tsdb-url>`");
+        }
+
+        String tsdbUrl = args[0];
+
+
+        final OpenTsdbClient.Builder openTsdbClientBuilder = OpenTsdbClient.newBuilder(tsdbUrl);
+        final OpenTsdbStateFactory openTsdbStateFactory =
+                new OpenTsdbStateFactory(openTsdbClientBuilder,
+                        Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+
+        TridentTopology tridentTopology = new TridentTopology();
+        final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenBatchSpout(10));
+
+        stream.peek(new Consumer() {
+            @Override
+            public void accept(TridentTuple input) {
+                LOG.info("########### Received tuple: [{}]", input);
+            }
+        }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater());
+
+
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        if (args.length > 1) {
+            conf.setNumWorkers(3);
+
+            StormSubmitter.submitTopologyWithProgressBar(args[1], conf, tridentTopology.build());
+        } else {
+            conf.setMaxTaskParallelism(3);
+
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("word-count", conf, tridentTopology.build());
+
+            Thread.sleep(30000);
+
+            cluster.shutdown();
+            System.exit(0);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/pom.xml b/examples/storm-redis-examples/pom.xml
new file mode 100644
index 0000000..91a8660
--- /dev/null
+++ b/examples/storm-redis-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-redis-examples</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-redis</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
new file mode 100644
index 0000000..f62b7b0
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.bolt.RedisLookupBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class LookupWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+    private static final String PRINT_BOLT = "PRINT_BOLT";
+
+    private static final String TEST_REDIS_HOST = "127.0.0.1";
+    private static final int TEST_REDIS_PORT = 6379;
+
+    public static class PrintWordTotalCountBolt extends BaseRichBolt {
+        private static final Logger LOG = LoggerFactory.getLogger(PrintWordTotalCountBolt.class);
+        private static final Random RANDOM = new Random();
+        private OutputCollector collector;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            String wordName = input.getStringByField("wordName");
+            String countStr = input.getStringByField("count");
+
+            // print lookup result with low probability
+            if(RANDOM.nextInt(1000) > 995) {
+                int count = 0;
+                if (countStr != null) {
+                    count = Integer.parseInt(countStr);
+                }
+                LOG.info("Lookup result - word : " + wordName + " / count : " + count);
+            }
+
+            collector.ack(input);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        String host = TEST_REDIS_HOST;
+        int port = TEST_REDIS_PORT;
+
+        if (args.length >= 2) {
+            host = args[0];
+            port = Integer.parseInt(args[1]);
+        }
+
+        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                .setHost(host).setPort(port).build();
+
+        WordSpout spout = new WordSpout();
+        RedisLookupMapper lookupMapper = setupLookupMapper();
+        RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
+
+        PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();
+
+        //wordspout -> lookupbolt
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(LOOKUP_BOLT);
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 3) {
+            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+        } else{
+            System.out.println("Usage: LookupWordCount <redis host> <redis port> (topology name)");
+        }
+    }
+
+    private static RedisLookupMapper setupLookupMapper() {
+        return new WordCountRedisLookupMapper();
+    }
+
+    private static class WordCountRedisLookupMapper implements RedisLookupMapper {
+        private RedisDataTypeDescription description;
+        private final String hashKey = "wordCount";
+
+        public WordCountRedisLookupMapper() {
+            description = new RedisDataTypeDescription(
+                    RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+        }
+
+        @Override
+        public List<Values> toTuple(ITuple input, Object value) {
+            String member = getKeyFromTuple(input);
+            List<Values> values = Lists.newArrayList();
+            values.add(new Values(member, value));
+            return values;
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("wordName", "count"));
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return tuple.getStringByField("word");
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
new file mode 100644
index 0000000..d46bab6
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.redis.bolt.AbstractRedisBolt;
+import org.apache.storm.redis.bolt.RedisStoreBolt;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisException;
+
+public class PersistentWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String STORE_BOLT = "STORE_BOLT";
+
+    private static final String TEST_REDIS_HOST = "127.0.0.1";
+    private static final int TEST_REDIS_PORT = 6379;
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        String host = TEST_REDIS_HOST;
+        int port = TEST_REDIS_PORT;
+
+        if (args.length >= 2) {
+            host = args[0];
+            port = Integer.parseInt(args[1]);
+        }
+
+        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                .setHost(host).setPort(port).build();
+
+        WordSpout spout = new WordSpout();
+        WordCounter bolt = new WordCounter();
+        RedisStoreMapper storeMapper = setupStoreMapper();
+        RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
+
+        // wordSpout ==> countBolt ==> RedisBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word"));
+        builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 3) {
+            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");
+        }
+    }
+
+    private static RedisStoreMapper setupStoreMapper() {
+        return new WordCountStoreMapper();
+    }
+
+    private static class WordCountStoreMapper implements RedisStoreMapper {
+        private RedisDataTypeDescription description;
+        private final String hashKey = "wordCount";
+
+        public WordCountStoreMapper() {
+            description = new RedisDataTypeDescription(
+                    RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return tuple.getStringByField("word");
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return tuple.getStringByField("count");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
new file mode 100644
index 0000000..bcb2e0b
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.redis.bolt.RedisFilterBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisFilterMapper;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Random;
+
+public class WhitelistWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String WHITELIST_BOLT = "WHITELIST_BOLT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String PRINT_BOLT = "PRINT_BOLT";
+
+    private static final String TEST_REDIS_HOST = "127.0.0.1";
+    private static final int TEST_REDIS_PORT = 6379;
+
+    public static class PrintWordTotalCountBolt extends BaseRichBolt {
+        private static final Logger LOG = LoggerFactory.getLogger(PrintWordTotalCountBolt.class);
+        private static final Random RANDOM = new Random();
+        private OutputCollector collector;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            String wordName = input.getStringByField("word");
+            String countStr = input.getStringByField("count");
+
+            // print lookup result with low probability
+            if(RANDOM.nextInt(1000) > 995) {
+                int count = 0;
+                if (countStr != null) {
+                    count = Integer.parseInt(countStr);
+                }
+                LOG.info("Count result - word : " + wordName + " / count : " + count);
+            }
+
+            collector.ack(input);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        String host = TEST_REDIS_HOST;
+        int port = TEST_REDIS_PORT;
+
+        if (args.length >= 2) {
+            host = args[0];
+            port = Integer.parseInt(args[1]);
+        }
+
+        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                .setHost(host).setPort(port).build();
+
+        WordSpout spout = new WordSpout();
+        RedisFilterMapper filterMapper = setupWhitelistMapper();
+        RedisFilterBolt whitelistBolt = new RedisFilterBolt(poolConfig, filterMapper);
+        WordCounter wordCounterBolt = new WordCounter();
+        PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(WHITELIST_BOLT, whitelistBolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(COUNT_BOLT, wordCounterBolt, 1).fieldsGrouping(WHITELIST_BOLT, new Fields("word"));
+        builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(COUNT_BOLT);
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 3) {
+            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+        } else{
+            System.out.println("Usage: WhitelistWordCount <redis host> <redis port> (topology name)");
+        }
+    }
+
+    private static RedisFilterMapper setupWhitelistMapper() {
+        return new WhitelistWordFilterMapper();
+    }
+
+    private static class WhitelistWordFilterMapper implements RedisFilterMapper {
+        private RedisDataTypeDescription description;
+        private final String setKey = "whitelist";
+
+        public WhitelistWordFilterMapper() {
+            description = new RedisDataTypeDescription(
+                    RedisDataTypeDescription.RedisDataType.SET, setKey);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return tuple.getStringByField("word");
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java
new file mode 100644
index 0000000..6fa930c
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class WordCounter implements IBasicBolt {
+    private Map<String, Integer> wordCounter = Maps.newHashMap();
+
+    @SuppressWarnings("rawtypes")
+    public void prepare(Map stormConf, TopologyContext context) {
+    }
+
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        String word = input.getStringByField("word");
+        int count;
+        if (wordCounter.containsKey(word)) {
+            count = wordCounter.get(word) + 1;
+            wordCounter.put(word, wordCounter.get(word) + 1);
+        } else {
+            count = 1;
+        }
+
+        wordCounter.put(word, count);
+        collector.emit(new Values(word, String.valueOf(count)));
+    }
+
+    public void cleanup() {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "count"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java
new file mode 100644
index 0000000..e2cdfde
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+public class WordSpout implements IRichSpout {
+    boolean isDistributed;
+    SpoutOutputCollector collector;
+    public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" };
+
+    public WordSpout() {
+        this(true);
+    }
+
+    public WordSpout(boolean isDistributed) {
+        this.isDistributed = isDistributed;
+    }
+
+    public boolean isDistributed() {
+        return this.isDistributed;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+    }
+
+    public void close() {
+
+    }
+
+    public void nextTuple() {
+        final Random rand = new Random();
+        final String word = words[rand.nextInt(words.length)];
+        this.collector.emit(new Values(word), UUID.randomUUID());
+        Thread.yield();
+    }
+
+    public void ack(Object msgId) {
+
+    }
+
+    public void fail(Object msgId) {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word"));
+    }
+
+    @Override
+    public void activate() {
+    }
+
+    @Override
+    public void deactivate() {
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/PrintFunction.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/PrintFunction.java
new file mode 100644
index 0000000..37d3936
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/PrintFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.Random;
+
+public class PrintFunction extends BaseFunction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class);
+
+    private static final Random RANDOM = new Random();
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
+        if(RANDOM.nextInt(1000) > 995) {
+            LOG.info(tuple.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountLookupMapper.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
new file mode 100644
index 0000000..a6ca8c9
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WordCountLookupMapper implements RedisLookupMapper {
+    @Override
+    public List<Values> toTuple(ITuple input, Object value) {
+        List<Values> values = new ArrayList<Values>();
+        values.add(new Values(getKeyFromTuple(input), value));
+        return values;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "value"));
+    }
+
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return "test_" + tuple.getString(0);
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+        return tuple.getInteger(1).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
new file mode 100644
index 0000000..58df150
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+
+public class WordCountStoreMapper implements RedisStoreMapper {
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return "test_" + tuple.getString(0);
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+        return tuple.getInteger(1).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
new file mode 100644
index 0000000..e3eb0f9
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateQuerier;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+public class WordCountTridentRedis {
+    public static StormTopology buildTopology(String redisHost, Integer redisPort){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                                        .setHost(redisHost).setPort(redisPort)
+                                        .build();
+
+        RedisStoreMapper storeMapper = new WordCountStoreMapper();
+        RedisLookupMapper lookupMapper = new WordCountLookupMapper();
+        RedisState.Factory factory = new RedisState.Factory(poolConfig);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory,
+                                fields,
+                                new RedisStateUpdater(storeMapper).withExpire(86400000),
+                                new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                                new RedisStateQuerier(lookupMapper),
+                                new Fields("columnName","columnValue"));
+        stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 3) {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+            System.exit(1);
+        }
+
+        Integer flag = Integer.valueOf(args[0]);
+        String redisHost = args[1];
+        Integer redisPort = Integer.valueOf(args[2]);
+
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (flag == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("test_wordCounter_for_redis");
+            cluster.shutdown();
+            System.exit(0);
+        } else if(flag == 1) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+        } else {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
new file mode 100644
index 0000000..116a58a
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.apache.storm.redis.trident.state.RedisClusterState;
+import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
+import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WordCountTridentRedisCluster {
+    public static StormTopology buildTopology(String redisHostPort){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
+        for (String hostPort : redisHostPort.split(",")) {
+            String[] host_port = hostPort.split(":");
+            nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
+        }
+        JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
+                                        .build();
+
+        RedisStoreMapper storeMapper = new WordCountStoreMapper();
+        RedisLookupMapper lookupMapper = new WordCountLookupMapper();
+        RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory,
+                                fields,
+                                new RedisClusterStateUpdater(storeMapper).withExpire(86400000),
+                                new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                                new RedisClusterStateQuerier(lookupMapper),
+                                new Fields("columnName","columnValue"));
+        stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2) {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+            System.exit(1);
+        }
+
+        Integer flag = Integer.valueOf(args[0]);
+        String redisHostPort = args[1];
+
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (flag == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("test_wordCounter_for_redis");
+            cluster.shutdown();
+            System.exit(0);
+        } else if(flag == 1) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+        } else {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
new file mode 100644
index 0000000..fafb4e0
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.TupleMapper;
+import org.apache.storm.redis.trident.state.RedisClusterMapState;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WordCountTridentRedisClusterMap {
+    public static StormTopology buildTopology(String redisHostPort){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
+        for (String hostPort : redisHostPort.split(",")) {
+            String[] host_port = hostPort.split(":");
+            nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
+        }
+        JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
+                                        .build();
+        RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+                RedisDataTypeDescription.RedisDataType.HASH, "test");
+        StateFactory factory = RedisClusterMapState.transactional(clusterConfig, dataTypeDescription);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2) {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+            System.exit(1);
+        }
+
+        Integer flag = Integer.valueOf(args[0]);
+        String redisHostPort = args[1];
+
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (flag == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("test_wordCounter_for_redis");
+            cluster.shutdown();
+            System.exit(0);
+        } else if(flag == 1) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+        } else {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
new file mode 100644
index 0000000..384f97c
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.trident.state.RedisMapState;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+public class WordCountTridentRedisMap {
+    public static StormTopology buildTopology(String redisHost, Integer redisPort){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                                        .setHost(redisHost).setPort(redisPort)
+                                        .build();
+
+        RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+                RedisDataTypeDescription.RedisDataType.HASH, "test");
+        StateFactory factory = RedisMapState.transactional(poolConfig, dataTypeDescription);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 3) {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+            System.exit(1);
+        }
+
+        Integer flag = Integer.valueOf(args[0]);
+        String redisHost = args[1];
+        Integer redisPort = Integer.valueOf(args[2]);
+
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (flag == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("test_wordCounter_for_redis");
+            cluster.shutdown();
+            System.exit(0);
+        } else if(flag == 1) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+        } else {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-solr-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-solr-examples/pom.xml b/examples/storm-solr-examples/pom.xml
new file mode 100644
index 0000000..e64ec5f
--- /dev/null
+++ b/examples/storm-solr-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-solr-examples</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-solr</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>