You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/12 04:16:10 UTC
[05/10] storm git commit: STORM-1970: external project examples
refator
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-mqtt-examples/src/main/flux/sample.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/src/main/flux/sample.yaml b/examples/storm-mqtt-examples/src/main/flux/sample.yaml
new file mode 100644
index 0000000..c2902dc
--- /dev/null
+++ b/examples/storm-mqtt-examples/src/main/flux/sample.yaml
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "mqtt-topology"
+
+components:
+ ########## MQTT Spout Config ############
+ - id: "mqtt-type"
+ className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+ - id: "mqtt-options"
+ className: "org.apache.storm.mqtt.common.MqttOptions"
+ properties:
+ - name: "url"
+ value: "tcp://localhost:1883"
+ - name: "topics"
+ value:
+ - "/users/tgoetz/#"
+
+# topology configuration
+config:
+ topology.workers: 1
+ topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+ - id: "mqtt-spout"
+ className: "org.apache.storm.mqtt.spout.MqttSpout"
+ constructorArgs:
+ - ref: "mqtt-type"
+ - ref: "mqtt-options"
+ parallelism: 1
+
+# bolt definitions
+bolts:
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+
+streams:
+ - from: "mqtt-spout"
+ to: "log"
+ grouping:
+ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml b/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml
new file mode 100644
index 0000000..bfb668d
--- /dev/null
+++ b/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "mqtt-topology"
+
+components:
+ ########## MQTT Spout Config ############
+ - id: "mqtt-type"
+ className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+ - id: "keystore-loader"
+ className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
+ constructorArgs:
+ - "keystore.jks"
+ - "truststore.jks"
+ properties:
+ - name: "keyPassword"
+ value: "password"
+ - name: "keyStorePassword"
+ value: "password"
+ - name: "trustStorePassword"
+ value: "password"
+
+ - id: "mqtt-options"
+ className: "org.apache.storm.mqtt.common.MqttOptions"
+ properties:
+ - name: "url"
+ value: "ssl://raspberrypi.local:8883"
+ - name: "topics"
+ value:
+ - "/users/tgoetz/#"
+
+# topology configuration
+config:
+ topology.workers: 1
+ topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+ - id: "mqtt-spout"
+ className: "org.apache.storm.mqtt.spout.MqttSpout"
+ constructorArgs:
+ - ref: "mqtt-type"
+ - ref: "mqtt-options"
+ - ref: "keystore-loader"
+ parallelism: 1
+
+# bolt definitions
+bolts:
+
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+
+streams:
+
+ - from: "mqtt-spout"
+ to: "log"
+ grouping:
+ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
new file mode 100644
index 0000000..ec5645c
--- /dev/null
+++ b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.examples;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.MqttMessageMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Given a topic name: "users/{user}/{location}/{deviceId}"
+ * and a payload of "{temperature}/{humidity}"
+ * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float)
+ *
+ */
+public class CustomMessageMapper implements MqttMessageMapper {
+ private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class);
+
+
+ public Values toValues(MqttMessage message) {
+ String topic = message.getTopic();
+ String[] topicElements = topic.split("/");
+ String[] payloadElements = new String(message.getMessage()).split("/");
+
+ return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]),
+ Float.parseFloat(payloadElements[1]));
+ }
+
+ public Fields outputFields() {
+ return new Fields("user", "deviceId", "location", "temperature", "humidity");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java
new file mode 100644
index 0000000..fa8389d
--- /dev/null
+++ b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.examples;
+
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.storm.mqtt.MqttLogger;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class MqttBrokerPublisher {
+ private static final Logger LOG = LoggerFactory.getLogger(MqttBrokerPublisher.class);
+
+ private static BrokerService broker;
+
+ private static BlockingConnection connection;
+
+
+ public static void startBroker() throws Exception {
+ LOG.info("Starting broker...");
+ broker = new BrokerService();
+ broker.addConnector("mqtt://localhost:1883");
+ broker.setDataDirectory("target");
+ broker.start();
+ LOG.info("MQTT broker started");
+ Runtime.getRuntime().addShutdownHook(new Thread(){
+ @Override
+ public void run() {
+ try {
+ LOG.info("Shutting down MQTT broker...");
+ broker.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ public static void startPublisher() throws Exception {
+ MQTT client = new MQTT();
+ client.setTracer(new MqttLogger());
+ client.setHost("tcp://localhost:1883");
+ client.setClientId("MqttBrokerPublisher");
+ connection = client.blockingConnection();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(){
+ @Override
+ public void run() {
+ try {
+ LOG.info("Shutting down MQTT client...");
+ connection.disconnect();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ connection.connect();
+ }
+
+ public static void publish() throws Exception {
+ String topic = "/users/tgoetz/office/1234";
+ Random rand = new Random();
+ LOG.info("Publishing to topic {}", topic);
+ LOG.info("Cntrl+C to exit.");
+
+ while(true) {
+ int temp = rand.nextInt(100);
+ int hum = rand.nextInt(100);
+ String payload = temp + "/" + hum;
+
+ connection.publish(topic, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+ Thread.sleep(500);
+ }
+ }
+
+ public static void main(String[] args) throws Exception{
+ startBroker();
+ startPublisher();
+ publish();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-mqtt-examples/src/main/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/src/main/resources/log4j2.xml b/examples/storm-mqtt-examples/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..bfe57a1
--- /dev/null
+++ b/examples/storm-mqtt-examples/src/main/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+ </Console>
+ </Appenders>
+
+ <Loggers>
+ <Logger name="org.apache.storm.flux.wrappers" level="INFO"/>
+ <Logger name="org.apache.storm.mqtt" level="DEBUG"/>
+ <Root level="error">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-opentsdb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/pom.xml b/examples/storm-opentsdb-examples/pom.xml
new file mode 100644
index 0000000..057efae
--- /dev/null
+++ b/examples/storm-opentsdb-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-opentsdb-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-opentsdb</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java
new file mode 100644
index 0000000..b0580f6
--- /dev/null
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * BatchSpout implementation for metrics generation.
+ */
+public class MetricGenBatchSpout implements IBatchSpout {
+
+ private int batchSize;
+ private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+ public MetricGenBatchSpout(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ List<List<Object>> values;
+ if(batches.containsKey(batchId)) {
+ values = batches.get(batchId);
+ } else {
+ values = new ArrayList<>();
+ for (int i = 0; i < batchSize; i++) {
+ // tuple values are mapped with
+ // metric, timestamp, value, Map of tagK/tagV respectively.
+ values.add(Lists.newArrayList(Lists.newArrayList("device.temp", System.currentTimeMillis(), new Random().nextLong(),
+ Collections.singletonMap("loc.id", new Random().nextInt() % 64 + ""))));
+ }
+ batches.put(batchId, values);
+ }
+ for (List<Object> value : values) {
+ collector.emit(value);
+ }
+
+ }
+
+ @Override
+ public void ack(long batchId) {
+ batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(1);
+ return conf;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return MetricGenSpout.DEFAULT_METRIC_FIELDS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java
new file mode 100644
index 0000000..21af196
--- /dev/null
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Spout to generate tuples containing metric data.
+ */
+public class MetricGenSpout extends BaseRichSpout {
+
+ public static final Fields DEFAULT_METRIC_FIELDS =
+ new Fields(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getMetricField(),
+ TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getTimestampField(),
+ TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getValueField(),
+ TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getTagsField());
+
+ private Map conf;
+ private TopologyContext context;
+ private SpoutOutputCollector collector;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(DEFAULT_METRIC_FIELDS);
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.conf = conf;
+ this.context = context;
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ // tuple values are mapped with
+ // metric, timestamp, value, Map of tagK/tagV respectively.
+ collector.emit(Lists.newArrayList("device.temp", System.currentTimeMillis(), new Random().nextLong(),
+ Collections.singletonMap("loc.id", new Random().nextInt() % 64 + "")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
new file mode 100644
index 0000000..6c511b8
--- /dev/null
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import org.apache.storm.opentsdb.bolt.OpenTsdbBolt;
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.opentsdb.client.OpenTsdbClient;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+
+import java.util.Collections;
+
+/**
+ * Sample application to use OpenTSDB bolt.
+ */
+public class SampleOpenTsdbBoltTopology {
+
+ public static void main(String[] args) throws Exception {
+ if(args.length == 0) {
+ throw new IllegalArgumentException("There should be at least one argument. Run as `SampleOpenTsdbBoltTopology <tsdb-url>`");
+ }
+
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+ topologyBuilder.setSpout("metric-gen", new MetricGenSpout(), 5);
+
+ String openTsdbUrl = args[0];
+ OpenTsdbClient.Builder builder = OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
+ final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+ openTsdbBolt.withBatchSize(10).withFlushInterval(2).failTupleForFailedMetrics();
+ topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ if (args.length > 1) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopologyWithProgressBar(args[1], conf, topologyBuilder.createTopology());
+ } else {
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, topologyBuilder.createTopology());
+
+ Thread.sleep(30000);
+
+ cluster.shutdown();
+ System.exit(0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
new file mode 100644
index 0000000..db51a8a
--- /dev/null
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.opentsdb;
+
+import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
+import org.apache.storm.opentsdb.client.OpenTsdbClient;
+import org.apache.storm.opentsdb.trident.OpenTsdbStateFactory;
+import org.apache.storm.opentsdb.trident.OpenTsdbStateUpdater;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.Consumer;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * Sample trident topology to store time series metrics in to OpenTsdb.
+ */
+public class SampleOpenTsdbTridentTopology {
+ private static final Logger LOG = LoggerFactory.getLogger(SampleOpenTsdbTridentTopology.class);
+
+ public static void main(String[] args) throws Exception {
+ if(args.length == 0) {
+ throw new IllegalArgumentException("There should be at least one argument. Run as `SampleOpenTsdbTridentTopology <tsdb-url>`");
+ }
+
+ String tsdbUrl = args[0];
+
+
+ final OpenTsdbClient.Builder openTsdbClientBuilder = OpenTsdbClient.newBuilder(tsdbUrl);
+ final OpenTsdbStateFactory openTsdbStateFactory =
+ new OpenTsdbStateFactory(openTsdbClientBuilder,
+ Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+
+ TridentTopology tridentTopology = new TridentTopology();
+ final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenBatchSpout(10));
+
+ stream.peek(new Consumer() {
+ @Override
+ public void accept(TridentTuple input) {
+ LOG.info("########### Received tuple: [{}]", input);
+ }
+ }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater());
+
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ if (args.length > 1) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopologyWithProgressBar(args[1], conf, tridentTopology.build());
+ } else {
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, tridentTopology.build());
+
+ Thread.sleep(30000);
+
+ cluster.shutdown();
+ System.exit(0);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/pom.xml b/examples/storm-redis-examples/pom.xml
new file mode 100644
index 0000000..91a8660
--- /dev/null
+++ b/examples/storm-redis-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-redis-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-redis</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
new file mode 100644
index 0000000..f62b7b0
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.bolt.RedisLookupBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class LookupWordCount {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+ private static final String PRINT_BOLT = "PRINT_BOLT";
+
+ private static final String TEST_REDIS_HOST = "127.0.0.1";
+ private static final int TEST_REDIS_PORT = 6379;
+
+ public static class PrintWordTotalCountBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(PrintWordTotalCountBolt.class);
+ private static final Random RANDOM = new Random();
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String wordName = input.getStringByField("wordName");
+ String countStr = input.getStringByField("count");
+
+ // print lookup result with low probability
+ if(RANDOM.nextInt(1000) > 995) {
+ int count = 0;
+ if (countStr != null) {
+ count = Integer.parseInt(countStr);
+ }
+ LOG.info("Lookup result - word : " + wordName + " / count : " + count);
+ }
+
+ collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+
+ String host = TEST_REDIS_HOST;
+ int port = TEST_REDIS_PORT;
+
+ if (args.length >= 2) {
+ host = args[0];
+ port = Integer.parseInt(args[1]);
+ }
+
+ JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+ .setHost(host).setPort(port).build();
+
+ WordSpout spout = new WordSpout();
+ RedisLookupMapper lookupMapper = setupLookupMapper();
+ RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
+
+ PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();
+
+ //wordspout -> lookupbolt
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(LOOKUP_BOLT);
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, builder.createTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else if (args.length == 3) {
+ StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+ } else{
+ System.out.println("Usage: LookupWordCount <redis host> <redis port> (topology name)");
+ }
+ }
+
+ private static RedisLookupMapper setupLookupMapper() {
+ return new WordCountRedisLookupMapper();
+ }
+
+ private static class WordCountRedisLookupMapper implements RedisLookupMapper {
+ private RedisDataTypeDescription description;
+ private final String hashKey = "wordCount";
+
+ public WordCountRedisLookupMapper() {
+ description = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+ }
+
+ @Override
+ public List<Values> toTuple(ITuple input, Object value) {
+ String member = getKeyFromTuple(input);
+ List<Values> values = Lists.newArrayList();
+ values.add(new Values(member, value));
+ return values;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("wordName", "count"));
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return description;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return tuple.getStringByField("word");
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
new file mode 100644
index 0000000..d46bab6
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.redis.bolt.AbstractRedisBolt;
+import org.apache.storm.redis.bolt.RedisStoreBolt;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisException;
+
+public class PersistentWordCount {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String COUNT_BOLT = "COUNT_BOLT";
+ private static final String STORE_BOLT = "STORE_BOLT";
+
+ private static final String TEST_REDIS_HOST = "127.0.0.1";
+ private static final int TEST_REDIS_PORT = 6379;
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+
+ String host = TEST_REDIS_HOST;
+ int port = TEST_REDIS_PORT;
+
+ if (args.length >= 2) {
+ host = args[0];
+ port = Integer.parseInt(args[1]);
+ }
+
+ JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+ .setHost(host).setPort(port).build();
+
+ WordSpout spout = new WordSpout();
+ WordCounter bolt = new WordCounter();
+ RedisStoreMapper storeMapper = setupStoreMapper();
+ RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
+
+ // wordSpout ==> countBolt ==> RedisBolt
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word"));
+ builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, builder.createTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else if (args.length == 3) {
+ StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+ } else {
+ System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");
+ }
+ }
+
+ private static RedisStoreMapper setupStoreMapper() {
+ return new WordCountStoreMapper();
+ }
+
+ private static class WordCountStoreMapper implements RedisStoreMapper {
+ private RedisDataTypeDescription description;
+ private final String hashKey = "wordCount";
+
+ public WordCountStoreMapper() {
+ description = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return description;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return tuple.getStringByField("word");
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return tuple.getStringByField("count");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
new file mode 100644
index 0000000..bcb2e0b
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.redis.bolt.RedisFilterBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisFilterMapper;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Random;
+
+public class WhitelistWordCount {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String WHITELIST_BOLT = "WHITELIST_BOLT";
+ private static final String COUNT_BOLT = "COUNT_BOLT";
+ private static final String PRINT_BOLT = "PRINT_BOLT";
+
+ private static final String TEST_REDIS_HOST = "127.0.0.1";
+ private static final int TEST_REDIS_PORT = 6379;
+
+ public static class PrintWordTotalCountBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(PrintWordTotalCountBolt.class);
+ private static final Random RANDOM = new Random();
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String wordName = input.getStringByField("word");
+ String countStr = input.getStringByField("count");
+
+ // print lookup result with low probability
+ if(RANDOM.nextInt(1000) > 995) {
+ int count = 0;
+ if (countStr != null) {
+ count = Integer.parseInt(countStr);
+ }
+ LOG.info("Count result - word : " + wordName + " / count : " + count);
+ }
+
+ collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+
+ String host = TEST_REDIS_HOST;
+ int port = TEST_REDIS_PORT;
+
+ if (args.length >= 2) {
+ host = args[0];
+ port = Integer.parseInt(args[1]);
+ }
+
+ JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+ .setHost(host).setPort(port).build();
+
+ WordSpout spout = new WordSpout();
+ RedisFilterMapper filterMapper = setupWhitelistMapper();
+ RedisFilterBolt whitelistBolt = new RedisFilterBolt(poolConfig, filterMapper);
+ WordCounter wordCounterBolt = new WordCounter();
+ PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(WHITELIST_BOLT, whitelistBolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(COUNT_BOLT, wordCounterBolt, 1).fieldsGrouping(WHITELIST_BOLT, new Fields("word"));
+ builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(COUNT_BOLT);
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, builder.createTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else if (args.length == 3) {
+ StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+ } else{
+ System.out.println("Usage: WhitelistWordCount <redis host> <redis port> (topology name)");
+ }
+ }
+
+ private static RedisFilterMapper setupWhitelistMapper() {
+ return new WhitelistWordFilterMapper();
+ }
+
+ private static class WhitelistWordFilterMapper implements RedisFilterMapper {
+ private RedisDataTypeDescription description;
+ private final String setKey = "whitelist";
+
+ public WhitelistWordFilterMapper() {
+ description = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.SET, setKey);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return description;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return tuple.getStringByField("word");
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java
new file mode 100644
index 0000000..6fa930c
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class WordCounter implements IBasicBolt {
+ private Map<String, Integer> wordCounter = Maps.newHashMap();
+
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map stormConf, TopologyContext context) {
+ }
+
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ String word = input.getStringByField("word");
+ int count;
+ if (wordCounter.containsKey(word)) {
+ count = wordCounter.get(word) + 1;
+ wordCounter.put(word, wordCounter.get(word) + 1);
+ } else {
+ count = 1;
+ }
+
+ wordCounter.put(word, count);
+ collector.emit(new Values(word, String.valueOf(count)));
+ }
+
+ public void cleanup() {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java
new file mode 100644
index 0000000..e2cdfde
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+public class WordSpout implements IRichSpout {
+ boolean isDistributed;
+ SpoutOutputCollector collector;
+ public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" };
+
+ public WordSpout() {
+ this(true);
+ }
+
+ public WordSpout(boolean isDistributed) {
+ this.isDistributed = isDistributed;
+ }
+
+ public boolean isDistributed() {
+ return this.isDistributed;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ public void close() {
+
+ }
+
+ public void nextTuple() {
+ final Random rand = new Random();
+ final String word = words[rand.nextInt(words.length)];
+ this.collector.emit(new Values(word), UUID.randomUUID());
+ Thread.yield();
+ }
+
+ public void ack(Object msgId) {
+
+ }
+
+ public void fail(Object msgId) {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public void activate() {
+ }
+
+ @Override
+ public void deactivate() {
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/PrintFunction.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/PrintFunction.java
new file mode 100644
index 0000000..37d3936
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/PrintFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.Random;
+
+public class PrintFunction extends BaseFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class);
+
+ private static final Random RANDOM = new Random();
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
+ if(RANDOM.nextInt(1000) > 995) {
+ LOG.info(tuple.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountLookupMapper.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
new file mode 100644
index 0000000..a6ca8c9
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WordCountLookupMapper implements RedisLookupMapper {
+ @Override
+ public List<Values> toTuple(ITuple input, Object value) {
+ List<Values> values = new ArrayList<Values>();
+ values.add(new Values(getKeyFromTuple(input), value));
+ return values;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "value"));
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return "test_" + tuple.getString(0);
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return tuple.getInteger(1).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
new file mode 100644
index 0000000..58df150
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+
+public class WordCountStoreMapper implements RedisStoreMapper {
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return "test_" + tuple.getString(0);
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return tuple.getInteger(1).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
new file mode 100644
index 0000000..e3eb0f9
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateQuerier;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+public class WordCountTridentRedis {
+ public static StormTopology buildTopology(String redisHost, Integer redisPort){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+ .setHost(redisHost).setPort(redisPort)
+ .build();
+
+ RedisStoreMapper storeMapper = new WordCountStoreMapper();
+ RedisLookupMapper lookupMapper = new WordCountLookupMapper();
+ RedisState.Factory factory = new RedisState.Factory(poolConfig);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory,
+ fields,
+ new RedisStateUpdater(storeMapper).withExpire(86400000),
+ new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new RedisStateQuerier(lookupMapper),
+ new Fields("columnName","columnValue"));
+ stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 3) {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ System.exit(1);
+ }
+
+ Integer flag = Integer.valueOf(args[0]);
+ String redisHost = args[1];
+ Integer redisPort = Integer.valueOf(args[2]);
+
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (flag == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("test_wordCounter_for_redis");
+ cluster.shutdown();
+ System.exit(0);
+ } else if(flag == 1) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+ } else {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
new file mode 100644
index 0000000..116a58a
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.apache.storm.redis.trident.state.RedisClusterState;
+import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
+import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WordCountTridentRedisCluster {
+ public static StormTopology buildTopology(String redisHostPort){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
+ for (String hostPort : redisHostPort.split(",")) {
+ String[] host_port = hostPort.split(":");
+ nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
+ }
+ JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
+ .build();
+
+ RedisStoreMapper storeMapper = new WordCountStoreMapper();
+ RedisLookupMapper lookupMapper = new WordCountLookupMapper();
+ RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory,
+ fields,
+ new RedisClusterStateUpdater(storeMapper).withExpire(86400000),
+ new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new RedisClusterStateQuerier(lookupMapper),
+ new Fields("columnName","columnValue"));
+ stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+ System.exit(1);
+ }
+
+ Integer flag = Integer.valueOf(args[0]);
+ String redisHostPort = args[1];
+
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (flag == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("test_wordCounter_for_redis");
+ cluster.shutdown();
+ System.exit(0);
+ } else if(flag == 1) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+ } else {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
new file mode 100644
index 0000000..fafb4e0
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.TupleMapper;
+import org.apache.storm.redis.trident.state.RedisClusterMapState;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WordCountTridentRedisClusterMap {
+ public static StormTopology buildTopology(String redisHostPort){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
+ for (String hostPort : redisHostPort.split(",")) {
+ String[] host_port = hostPort.split(":");
+ nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
+ }
+ JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
+ .build();
+ RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, "test");
+ StateFactory factory = RedisClusterMapState.transactional(clusterConfig, dataTypeDescription);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+ System.exit(1);
+ }
+
+ Integer flag = Integer.valueOf(args[0]);
+ String redisHostPort = args[1];
+
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (flag == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("test_wordCounter_for_redis");
+ cluster.shutdown();
+ System.exit(0);
+ } else if(flag == 1) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+ } else {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
new file mode 100644
index 0000000..384f97c
--- /dev/null
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.trident.state.RedisMapState;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+public class WordCountTridentRedisMap {
+ public static StormTopology buildTopology(String redisHost, Integer redisPort){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+ .setHost(redisHost).setPort(redisPort)
+ .build();
+
+ RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, "test");
+ StateFactory factory = RedisMapState.transactional(poolConfig, dataTypeDescription);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 3) {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ System.exit(1);
+ }
+
+ Integer flag = Integer.valueOf(args[0]);
+ String redisHost = args[1];
+ Integer redisPort = Integer.valueOf(args[2]);
+
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (flag == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("test_wordCounter_for_redis");
+ cluster.shutdown();
+ System.exit(0);
+ } else if(flag == 1) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+ } else {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-solr-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-solr-examples/pom.xml b/examples/storm-solr-examples/pom.xml
new file mode 100644
index 0000000..e64ec5f
--- /dev/null
+++ b/examples/storm-solr-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-solr-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-solr</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>