You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/12 04:16:20 UTC
[06/12] storm git commit: STORM-1970: external project examples
refator
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
new file mode 100644
index 0000000..788b33c
--- /dev/null
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.hdfs.common.rotation.MoveFileAction;
+import org.apache.storm.hdfs.trident.format.*;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Map;
+
+public class TridentSequenceTopology {
+
+ public static StormTopology buildTopology(String hdfsUrl){
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", "key"), 1000, new Values("the cow jumped over the moon", 1l),
+ new Values("the man went to the store and bought some candy", 2l), new Values("four score and seven years ago", 3l),
+ new Values("how many apples can you eat", 4l), new Values("to be or not to be the person", 5l));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ Fields hdfsFields = new Fields("sentence", "key");
+
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+ .withPath("/tmp/trident")
+ .withPrefix("trident")
+ .withExtension(".seq");
+
+ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
+
+ HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
+ .withFileNameFormat(fileNameFormat)
+ .withSequenceFormat(new DefaultSequenceFormat("key", "sentence"))
+ .withRotationPolicy(rotationPolicy)
+ .withFsUrl(hdfsUrl)
+ .withConfigKey("hdfs.config")
+ .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/"));
+ StateFactory factory = new HdfsStateFactory().withOptions(seqOpts);
+
+ TridentState state = stream
+ .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
+
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+
+ Yaml yaml = new Yaml();
+ InputStream in = new FileInputStream(args[1]);
+ Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
+ in.close();
+ conf.put("hdfs.config", yamlConf);
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", conf, buildTopology(args[0]));
+ Thread.sleep(120 * 1000);
+ } else if(args.length == 3) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0]));
+ } else{
+ System.out.println("Usage: TridentSequenceTopology [hdfs url] [hdfs yaml config file] <topology name>");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-hive-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-hive-examples/pom.xml b/examples/storm-hive-examples/pom.xml
new file mode 100644
index 0000000..85d7bce
--- /dev/null
+++ b/examples/storm-hive-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-hive-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hive</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
new file mode 100644
index 0000000..781a539
--- /dev/null
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class BucketTestHiveTopology {
+ static final String USER_SPOUT_ID = "user-spout";
+ static final String BOLT_ID = "my-hive-bolt";
+ static final String TOPOLOGY_NAME = "hive-test-topology1";
+
+ public static void main(String[] args) throws Exception {
+ if ((args == null) || (args.length < 7)) {
+ System.out.println("Usage: BucketTestHiveTopology metastoreURI "
+ + "dbName tableName dataFileLocation hiveBatchSize " +
+ "hiveTickTupl]eIntervalSecs workers [topologyNamey] [keytab file]"
+ + " [principal name] ");
+ System.exit(1);
+ }
+ String metaStoreURI = args[0];
+ String dbName = args[1];
+ String tblName = args[2];
+ String sourceFileLocation = args[3];
+ Integer hiveBatchSize = Integer.parseInt(args[4]);
+ Integer hiveTickTupleIntervalSecs = Integer.parseInt(args[5]);
+ Integer workers = Integer.parseInt(args[6]);
+ String[] colNames = { "ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk",
+ "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk",
+ "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
+ "ss_wholesale_cost", "ss_list_price", "ss_sales_price",
+ "ss_ext_discount_amt", "ss_ext_sales_price",
+ "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax",
+ "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax",
+ "ss_net_profit" };
+ Config config = new Config();
+ config.setNumWorkers(workers);
+ UserDataSpout spout = new UserDataSpout().withDataFile(sourceFileLocation);
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames)).withTimeAsPartitionField("yyyy/MM/dd");
+ HiveOptions hiveOptions;
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(hiveBatchSize);
+ // doing below because its affecting storm metrics most likely
+ // had to make tick tuple a mandatory argument since its positional
+ if (hiveTickTupleIntervalSecs > 0) {
+ hiveOptions.withTickTupleInterval(hiveTickTupleIntervalSecs);
+ }
+ if (args.length == 10) {
+ hiveOptions.withKerberosKeytab(args[8]).withKerberosPrincipal(args[9]);
+ }
+ HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(USER_SPOUT_ID, spout, 1);
+ // SentenceSpout --> MyBolt
+ builder.setBolt(BOLT_ID, hiveBolt, 14)
+ .shuffleGrouping(USER_SPOUT_ID);
+ if (args.length == 6) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+ waitForSeconds(20);
+ cluster.killTopology(TOPOLOGY_NAME);
+ System.out.println("cluster begin to shutdown");
+ cluster.shutdown();
+ System.out.println("cluster shutdown");
+ System.exit(0);
+ } else {
+ StormSubmitter.submitTopology(args[7], config, builder.createTopology());
+ }
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public static class UserDataSpout extends BaseRichSpout {
+ private ConcurrentHashMap<UUID, Values> pending;
+ private SpoutOutputCollector collector;
+ private String filePath;
+ private BufferedReader br;
+ private int count = 0;
+ private long total = 0L;
+ private String[] outputFields = { "ss_sold_date_sk", "ss_sold_time_sk",
+ "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk",
+ "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number",
+ "ss_quantity", "ss_wholesale_cost", "ss_list_price",
+ "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price",
+ "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax",
+ "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax",
+ "ss_net_profit" };
+
+ public UserDataSpout withDataFile (String filePath) {
+ this.filePath = filePath;
+ return this;
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(this.outputFields));
+ }
+
+ public void open(Map config, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.pending = new ConcurrentHashMap<UUID, Values>();
+ try {
+ this.br = new BufferedReader(new FileReader(new File(this
+ .filePath)));
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ public void nextTuple() {
+ String line;
+ try {
+ if ((line = br.readLine()) != null) {
+ System.out.println("*********" + line);
+ String[] values = line.split("\\|", -1);
+ // above gives an extra empty string at the end. below
+ // removes that
+ values = Arrays.copyOfRange(values, 0,
+ this.outputFields.length);
+ Values tupleValues = new Values(values);
+ UUID msgId = UUID.randomUUID();
+ this.pending.put(msgId, tupleValues);
+ this.collector.emit(tupleValues, msgId);
+ count++;
+ total++;
+ if (count > 1000) {
+ count = 0;
+ System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+ }
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ public void ack(Object msgId) {
+ this.pending.remove(msgId);
+ }
+
+ public void fail(Object msgId) {
+ System.out.println("**** RESENDING FAILED TUPLE");
+ this.collector.emit(this.pending.get(msgId), msgId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java
new file mode 100644
index 0000000..8b61d5e
--- /dev/null
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class HiveTopology {
+ static final String USER_SPOUT_ID = "user-spout";
+ static final String BOLT_ID = "my-hive-bolt";
+ static final String TOPOLOGY_NAME = "hive-test-topology1";
+
+ public static void main(String[] args) throws Exception {
+ String metaStoreURI = args[0];
+ String dbName = args[1];
+ String tblName = args[2];
+ String[] colNames = {"id","name","phone","street","city","state"};
+ Config config = new Config();
+ config.setNumWorkers(1);
+ UserDataSpout spout = new UserDataSpout();
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withTimeAsPartitionField("yyyy/MM/dd/hh")
+ .withColumnFields(new Fields(colNames));
+ HiveOptions hiveOptions;
+ if (args.length == 6) {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(100)
+ .withIdleTimeout(10)
+ .withKerberosKeytab(args[4])
+ .withKerberosPrincipal(args[5]);
+ } else {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(100)
+ .withIdleTimeout(10)
+ .withMaxOpenConnections(1);
+ }
+
+ HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(USER_SPOUT_ID, spout, 1);
+ // SentenceSpout --> MyBolt
+ builder.setBolt(BOLT_ID, hiveBolt, 1)
+ .shuffleGrouping(USER_SPOUT_ID);
+ if (args.length == 3) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+ waitForSeconds(20);
+ cluster.killTopology(TOPOLOGY_NAME);
+ System.out.println("cluster begin to shutdown");
+ cluster.shutdown();
+ System.out.println("cluster shutdown");
+ System.exit(0);
+ } else if(args.length >= 4) {
+ StormSubmitter.submitTopology(args[3], config, builder.createTopology());
+ } else {
+ System.out.println("Usage: HiveTopology metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+ }
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public static class UserDataSpout extends BaseRichSpout {
+ private ConcurrentHashMap<UUID, Values> pending;
+ private SpoutOutputCollector collector;
+ private String[] sentences = {
+ "1,user1,123456,street1,sunnyvale,ca",
+ "2,user2,123456,street2,sunnyvale,ca",
+ "3,user3,123456,street3,san jose,ca",
+ "4,user4,123456,street4,san jose,ca",
+ };
+ private int index = 0;
+ private int count = 0;
+ private long total = 0L;
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id","name","phone","street","city","state"));
+ }
+
+ public void open(Map config, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.pending = new ConcurrentHashMap<UUID, Values>();
+ }
+
+ public void nextTuple() {
+ String[] user = sentences[index].split(",");
+ Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
+ UUID msgId = UUID.randomUUID();
+ this.pending.put(msgId, values);
+ this.collector.emit(values, msgId);
+ index++;
+ if (index >= sentences.length) {
+ index = 0;
+ }
+ count++;
+ total++;
+ if(count > 1000){
+ count = 0;
+ System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+ }
+ Thread.yield();
+ }
+
+ public void ack(Object msgId) {
+ this.pending.remove(msgId);
+ }
+
+ public void fail(Object msgId) {
+ System.out.println("**** RESENDING FAILED TUPLE");
+ this.collector.emit(this.pending.get(msgId), msgId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
----------------------------------------------------------------------
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
new file mode 100644
index 0000000..a52c490
--- /dev/null
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class HiveTopologyPartitioned {
+ static final String USER_SPOUT_ID = "hive-user-spout-partitioned";
+ static final String BOLT_ID = "my-hive-bolt-partitioned";
+ static final String TOPOLOGY_NAME = "hive-test-topology-partitioned";
+
+ public static void main(String[] args) throws Exception {
+ String metaStoreURI = args[0];
+ String dbName = args[1];
+ String tblName = args[2];
+ String[] partNames = {"city","state"};
+ String[] colNames = {"id","name","phone","street"};
+ Config config = new Config();
+ config.setNumWorkers(1);
+ UserDataSpout spout = new UserDataSpout();
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withPartitionFields(new Fields(partNames));
+ HiveOptions hiveOptions;
+ if (args.length == 6) {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(1000)
+ .withIdleTimeout(10)
+ .withKerberosKeytab(args[4])
+ .withKerberosPrincipal(args[5]);
+ } else {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(1000)
+ .withIdleTimeout(10);
+ }
+
+ HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(USER_SPOUT_ID, spout, 1);
+ // SentenceSpout --> MyBolt
+ builder.setBolt(BOLT_ID, hiveBolt, 1)
+ .shuffleGrouping(USER_SPOUT_ID);
+ if (args.length == 3) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+ waitForSeconds(20);
+ cluster.killTopology(TOPOLOGY_NAME);
+ System.out.println("cluster begin to shutdown");
+ cluster.shutdown();
+ System.out.println("cluster shutdown");
+ System.exit(0);
+ } else if(args.length >= 4) {
+ StormSubmitter.submitTopology(args[3], config, builder.createTopology());
+ } else {
+ System.out.println("Usage: HiveTopologyPartitioned metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+ }
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public static class UserDataSpout extends BaseRichSpout {
+ private ConcurrentHashMap<UUID, Values> pending;
+ private SpoutOutputCollector collector;
+ private String[] sentences = {
+ "1,user1,123456,street1,sunnyvale,ca",
+ "2,user2,123456,street2,sunnyvale,ca",
+ "3,user3,123456,street3,san jose,ca",
+ "4,user4,123456,street4,san jose,ca",
+ };
+ private int index = 0;
+ private int count = 0;
+ private long total = 0L;
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id","name","phone","street","city","state"));
+ }
+
+ public void open(Map config, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.pending = new ConcurrentHashMap<UUID, Values>();
+ }
+
+ public void nextTuple() {
+ String[] user = sentences[index].split(",");
+ Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
+ UUID msgId = UUID.randomUUID();
+ this.pending.put(msgId, values);
+ this.collector.emit(values, msgId);
+ index++;
+ if (index >= sentences.length) {
+ index = 0;
+ }
+ count++;
+ total++;
+ if(count > 1000){
+ Utils.sleep(1000);
+ count = 0;
+ System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+ }
+ }
+
+ public void ack(Object msgId) {
+ this.pending.remove(msgId);
+ }
+
+ public void fail(Object msgId) {
+ System.out.println("**** RESENDING FAILED TUPLE");
+ this.collector.emit(this.pending.get(msgId), msgId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java
new file mode 100644
index 0000000..86a35e6
--- /dev/null
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.trident;
+
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hooks.SubmitterHookException;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TridentHiveTopology {
+ private static final Logger LOG = LoggerFactory.getLogger(TridentHiveTopology.class);
+
+ public static StormTopology buildTopology(String metaStoreURI, String dbName, String tblName, Object keytab, Object principal) {
+ int batchSize = 100;
+ FixedBatchSpout spout = new FixedBatchSpout(batchSize);
+ spout.setCycle(true);
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("hiveTridentspout1",spout);
+ String[] partNames = {"city","state"};
+ String[] colNames = {"id","name","phone","street"};
+ Fields hiveFields = new Fields("id","name","phone","street","city","state");
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withPartitionFields(new Fields(partNames));
+ HiveOptions hiveOptions;
+ if (keytab != null && principal != null) {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(batchSize)
+ .withIdleTimeout(10)
+ .withCallTimeout(30000)
+ .withKerberosKeytab((String)keytab)
+ .withKerberosPrincipal((String)principal);
+ } else {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(batchSize)
+ .withCallTimeout(30000)
+ .withIdleTimeout(10);
+ }
+ StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
+ TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
+ return topology.build();
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public static void main(String[] args) {
+ String metaStoreURI = args[0];
+ String dbName = args[1];
+ String tblName = args[2];
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if(args.length == 3) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("tridentHiveTopology", conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+ LOG.info("waiting for 60 seconds");
+ waitForSeconds(60);
+ LOG.info("killing topology");
+ cluster.killTopology("tridenHiveTopology");
+ LOG.info("cluster shutdown");
+ cluster.shutdown();
+ LOG.info("cluster shutdown");
+ System.exit(0);
+ } else if(args.length == 4) {
+ try {
+ StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+ } catch(SubmitterHookException e) {
+ LOG.warn("Topology is submitted but invoking ISubmitterHook failed", e);
+ } catch (Exception e) {
+ LOG.warn("Failed to submit topology ", e);
+ }
+ } else if (args.length == 6) {
+ try {
+ StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,args[4],args[5]));
+ } catch(SubmitterHookException e) {
+ LOG.warn("Topology is submitted but invoking ISubmitterHook failed", e);
+ } catch (Exception e) {
+ LOG.warn("Failed to submit topology ", e);
+ }
+ } else {
+ LOG.info("Usage: TridentHiveTopology metastoreURI dbName tableName [topologyNamey]");
+ }
+ }
+
+ public static class FixedBatchSpout implements IBatchSpout {
+ int maxBatchSize;
+ HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+ private Values[] outputs = {
+ new Values("1","user1","123456","street1","sunnyvale","ca"),
+ new Values("2","user2","123456","street2","sunnyvale","ca"),
+ new Values("3","user3","123456","street3","san jose","ca"),
+ new Values("4","user4","123456","street4","san jose","ca"),
+ };
+ private int index = 0;
+ boolean cycle = false;
+
+ public FixedBatchSpout(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+
+ public void setCycle(boolean cycle) {
+ this.cycle = cycle;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields("id","name","phone","street","city","state");
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ index = 0;
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ List<List<Object>> batch = this.batches.get(batchId);
+ if(batch == null){
+ batch = new ArrayList<List<Object>>();
+ if(index>=outputs.length && cycle) {
+ index = 0;
+ }
+ for(int i=0; i < maxBatchSize; index++, i++) {
+ if(index == outputs.length){
+ index=0;
+ }
+ batch.add(outputs[index]);
+ }
+ this.batches.put(batchId, batch);
+ }
+ for(List<Object> list : batch){
+ collector.emit(list);
+ }
+ }
+
+ @Override
+ public void ack(long batchId) {
+ this.batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Map getComponentConfiguration() {
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(1);
+ return conf;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-jdbc-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-jdbc-examples/pom.xml b/examples/storm-jdbc-examples/pom.xml
new file mode 100644
index 0000000..d8fe96f
--- /dev/null
+++ b/examples/storm-jdbc-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-jdbc-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-jdbc</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/spout/UserSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/spout/UserSpout.java b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/spout/UserSpout.java
new file mode 100644
index 0000000..fdcd053
--- /dev/null
+++ b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/spout/UserSpout.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import com.google.common.collect.Lists;
+
+import java.util.*;
+
+public class UserSpout implements IRichSpout {
+ boolean isDistributed;
+ SpoutOutputCollector collector;
+ public static final List<Values> rows = Lists.newArrayList(
+ new Values(1,"peter",System.currentTimeMillis()),
+ new Values(2,"bob",System.currentTimeMillis()),
+ new Values(3,"alice",System.currentTimeMillis()));
+
+ public UserSpout() {
+ this(true);
+ }
+
+ public UserSpout(boolean isDistributed) {
+ this.isDistributed = isDistributed;
+ }
+
+ public boolean isDistributed() {
+ return this.isDistributed;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ public void close() {
+
+ }
+
+ public void nextTuple() {
+ final Random rand = new Random();
+ final Values row = rows.get(rand.nextInt(rows.size() - 1));
+ this.collector.emit(row);
+ Thread.yield();
+ }
+
+ public void ack(Object msgId) {
+
+ }
+
+ public void fail(Object msgId) {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("user_id","user_name","create_date"));
+ }
+
+ @Override
+ public void activate() {
+ }
+
+ @Override
+ public void deactivate() {
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
new file mode 100644
index 0000000..ec7ca36
--- /dev/null
+++ b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
+import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
+import org.apache.storm.jdbc.common.JdbcClient;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+import org.apache.storm.LocalCluster;
+
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractUserTopology {
+ private static final List<String> setupSqls = Lists.newArrayList(
+ "drop table if exists user",
+ "drop table if exists department",
+ "drop table if exists user_department",
+ "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
+ "create table if not exists department (dept_id integer, dept_name varchar(100))",
+ "create table if not exists user_department (user_id integer, dept_id integer)",
+ "insert into department values (1, 'R&D')",
+ "insert into department values (2, 'Finance')",
+ "insert into department values (3, 'HR')",
+ "insert into department values (4, 'Sales')",
+ "insert into user_department values (1, 1)",
+ "insert into user_department values (2, 2)",
+ "insert into user_department values (3, 3)",
+ "insert into user_department values (4, 4)"
+ );
+ protected UserSpout userSpout;
+ protected JdbcMapper jdbcMapper;
+ protected JdbcLookupMapper jdbcLookupMapper;
+ protected ConnectionProvider connectionProvider;
+
+ protected static final String TABLE_NAME = "user";
+ protected static final String JDBC_CONF = "jdbc.conf";
+ protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
+ " and user_department.user_id = ?";
+
+ public void execute(String[] args) throws Exception {
+ if (args.length != 4 && args.length != 5) {
+ System.out.println("Usage: " + this.getClass().getSimpleName() + " <dataSourceClassName> <dataSource.url> "
+ + "<user> <password> [topology name]");
+ System.exit(-1);
+ }
+ Map map = Maps.newHashMap();
+ map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+ map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
+ map.put("dataSource.user", args[2]);//root
+
+ if(args.length == 4) {
+ map.put("dataSource.password", args[3]);//password
+ }
+
+ Config config = new Config();
+ config.put(JDBC_CONF, map);
+
+ ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
+ connectionProvider.prepare();
+ int queryTimeoutSecs = 60;
+ JdbcClient jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
+ for (String sql : setupSqls) {
+ jdbcClient.executeSql(sql);
+ }
+
+ this.userSpout = new UserSpout();
+ this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, connectionProvider);
+ connectionProvider.cleanup();
+ Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
+ List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
+ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
+ this.connectionProvider = new HikariCPConnectionProvider(map);
+ if (args.length == 4) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, getTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else {
+ StormSubmitter.submitTopology(args[4], config, getTopology());
+ }
+ }
+
+ public abstract StormTopology getTopology();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
new file mode 100644
index 0000000..1915219
--- /dev/null
+++ b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import com.google.common.collect.Lists;
+import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
+import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+
+import java.sql.Types;
+import java.util.List;
+
+
+public class UserPersistanceTopology extends AbstractUserTopology {
+ private static final String USER_SPOUT = "USER_SPOUT";
+ private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+ private static final String PERSISTANCE_BOLT = "PERSISTANCE_BOLT";
+
+ public static void main(String[] args) throws Exception {
+ new UserPersistanceTopology().execute(args);
+ }
+
+ @Override
+ public StormTopology getTopology() {
+ JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(connectionProvider, SELECT_QUERY, this.jdbcLookupMapper);
+
+ //must specify column schema when providing custom query.
+ List<Column> schemaColumns = Lists.newArrayList(new Column("create_date", Types.DATE),
+ new Column("dept_name", Types.VARCHAR), new Column("user_id", Types.INTEGER), new Column("user_name", Types.VARCHAR));
+ JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
+
+ JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, mapper)
+ .withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)");
+
+ // userSpout ==> jdbcBolt
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(USER_SPOUT, this.userSpout, 1);
+ builder.setBolt(LOOKUP_BOLT, departmentLookupBolt, 1).shuffleGrouping(USER_SPOUT);
+ builder.setBolt(PERSISTANCE_BOLT, userPersistanceBolt, 1).shuffleGrouping(LOOKUP_BOLT);
+ return builder.createTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
new file mode 100644
index 0000000..11269c3
--- /dev/null
+++ b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import com.google.common.collect.Lists;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+import org.apache.storm.jdbc.trident.state.JdbcQuery;
+import org.apache.storm.jdbc.trident.state.JdbcState;
+import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
+import org.apache.storm.jdbc.trident.state.JdbcUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+
+import java.sql.Types;
+
+public class UserPersistanceTridentTopology extends AbstractUserTopology {
+
+ public static void main(String[] args) throws Exception {
+ new UserPersistanceTridentTopology().execute(args);
+ }
+
+ @Override
+ public StormTopology getTopology() {
+ TridentTopology topology = new TridentTopology();
+
+ JdbcState.Options options = new JdbcState.Options()
+ .withConnectionProvider(connectionProvider)
+ .withMapper(this.jdbcMapper)
+ .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
+ .withTableName(TABLE_NAME)
+ .withSelectQuery(SELECT_QUERY);
+
+ JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
+
+ Stream stream = topology.newStream("userSpout", new UserSpout());
+ TridentState state = topology.newStaticState(jdbcStateFactory);
+ stream = stream.stateQuery(state, new Fields("user_id","user_name","create_date"), new JdbcQuery(), new Fields("dept_name"));
+ stream.partitionPersist(jdbcStateFactory, new Fields("user_id","user_name","dept_name","create_date"), new JdbcUpdater(), new Fields());
+ return topology.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-kafka-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/pom.xml b/examples/storm-kafka-examples/pom.xml
new file mode 100644
index 0000000..8e2727c
--- /dev/null
+++ b/examples/storm-kafka-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-kafka-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/TridentKafkaTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/TridentKafkaTopology.java
new file mode 100644
index 0000000..fdc6752
--- /dev/null
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/TridentKafkaTopology.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
+import org.apache.storm.kafka.trident.TridentKafkaUpdater;
+import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+import java.util.Properties;
+
+public class TridentKafkaTopology {
+
+ private static StormTopology buildTopology(String brokerConnectionString) {
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", "1"),
+ new Values("trident", "1"),
+ new Values("needs", "1"),
+ new Values("javadoc", "1")
+ );
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokerConnectionString);
+ props.put("acks", "1");
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+ .withProducerProperties(props)
+ .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+ .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
+ stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
+
+ return topology.build();
+ }
+
+ /**
+ * To run this topology ensure you have a kafka broker running and provide connection string to broker as argument.
+ * Create a topic test with command line,
+ * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test
+ *
+ * run this program and run the kafka consumer:
+ * kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
+ *
+ * you should see the messages flowing through.
+ *
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ if(args.length < 1) {
+ System.out.println("Please provide kafka broker url ,e.g. localhost:9092");
+ }
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", new Config(), buildTopology(args[0]));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("wordCounter");
+
+ cluster.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-mongodb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/pom.xml b/examples/storm-mongodb-examples/pom.xml
new file mode 100644
index 0000000..c6e9eb2
--- /dev/null
+++ b/examples/storm-mongodb-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-mongodb-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-mongodb</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
new file mode 100644
index 0000000..366acf4
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.mongodb.bolt.MongoInsertBolt;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class InsertWordCount {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String COUNT_BOLT = "COUNT_BOLT";
+ private static final String INSERT_BOLT = "INSERT_BOLT";
+
+ private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test";
+ private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
+
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+
+ String url = TEST_MONGODB_URL;
+ String collectionName = TEST_MONGODB_COLLECTION_NAME;
+
+ if (args.length >= 2) {
+ url = args[0];
+ collectionName = args[1];
+ }
+
+ WordSpout spout = new WordSpout();
+ WordCounter bolt = new WordCounter();
+
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
+
+ // wordSpout ==> countBolt ==> MongoInsertBolt
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(INSERT_BOLT, insertBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, builder.createTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else if (args.length == 3) {
+ StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+ } else{
+ System.out.println("Usage: InsertWordCount <mongodb url> <mongodb collection> [topology name]");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
new file mode 100644
index 0000000..7895f35
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.mongodb.bolt.MongoInsertBolt;
+import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class UpdateWordCount {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String COUNT_BOLT = "COUNT_BOLT";
+ private static final String UPDATE_BOLT = "UPDATE_BOLT";
+
+ private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test";
+ private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
+
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+
+ String url = TEST_MONGODB_URL;
+ String collectionName = TEST_MONGODB_COLLECTION_NAME;
+
+ if (args.length >= 2) {
+ url = args[0];
+ collectionName = args[1];
+ }
+
+ WordSpout spout = new WordSpout();
+ WordCounter bolt = new WordCounter();
+
+ MongoMapper mapper = new SimpleMongoUpdateMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator , mapper);
+
+ //if a new document should be inserted if there are no matches to the query filter
+ //updateBolt.withUpsert(true);
+
+ // wordSpout ==> countBolt ==> MongoUpdateBolt
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(UPDATE_BOLT, updateBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, builder.createTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else if (args.length == 3) {
+ StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+ } else{
+ System.out.println("Usage: UpdateWordCount <mongodb url> <mongodb collection> [topology name]");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java
new file mode 100644
index 0000000..efb2d89
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class WordCounter implements IBasicBolt {
+ private Map<String, Integer> wordCounter = Maps.newHashMap();
+
+ public void prepare(Map stormConf, TopologyContext context) {
+
+ }
+
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ String word = input.getStringByField("word");
+ int count;
+ if (wordCounter.containsKey(word)) {
+ count = wordCounter.get(word) + 1;
+ wordCounter.put(word, wordCounter.get(word) + 1);
+ } else {
+ count = 1;
+ }
+
+ wordCounter.put(word, count);
+ collector.emit(new Values(word, String.valueOf(count)));
+ }
+
+ public void cleanup() {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java
new file mode 100644
index 0000000..885b1e8
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+public class WordSpout implements IRichSpout {
+ boolean isDistributed;
+ SpoutOutputCollector collector;
+ public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" };
+
+ public WordSpout() {
+ this(true);
+ }
+
+ public WordSpout(boolean isDistributed) {
+ this.isDistributed = isDistributed;
+ }
+
+ public boolean isDistributed() {
+ return this.isDistributed;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ public void close() {
+
+ }
+
+ public void nextTuple() {
+ final Random rand = new Random();
+ final String word = words[rand.nextInt(words.length)];
+ this.collector.emit(new Values(word), UUID.randomUUID());
+ Thread.yield();
+ }
+
+ public void ack(Object msgId) {
+
+ }
+
+ public void fail(Object msgId) {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public void activate() {
+ }
+
+ @Override
+ public void deactivate() {
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
new file mode 100644
index 0000000..44447be
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
+import org.apache.storm.mongodb.trident.state.MongoState;
+import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class WordCountTrident {
+
+ public static StormTopology buildTopology(String url, String collectionName){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ MongoState.Options options = new MongoState.Options()
+ .withUrl(url)
+ .withCollectionName(collectionName)
+ .withMapper(mapper);
+
+ StateFactory factory = new MongoStateFactory(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("wordCounter");
+ cluster.shutdown();
+ System.exit(0);
+ }
+ else if(args.length == 3) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
+ } else{
+ System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f9ff225e/examples/storm-mqtt-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/pom.xml b/examples/storm-mqtt-examples/pom.xml
new file mode 100644
index 0000000..432e7a5
--- /dev/null
+++ b/examples/storm-mqtt-examples/pom.xml
@@ -0,0 +1,116 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>storm-mqtt-examples</artifactId>
+ <packaging>jar</packaging>
+
+ <name>storm-mqtt-examples</name>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-mqtt</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.mqtt-client</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>1.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>5.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-mqtt</artifactId>
+ <version>5.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <version>5.9.0</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.4</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.storm.flux.Flux</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>