You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/12 04:16:08 UTC
[03/10] storm git commit: STORM-1970: external project examples
refator
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/TridentFileTopology.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
deleted file mode 100644
index 8f75c45..0000000
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.hdfs.common.rotation.MoveFileAction;
-import org.apache.storm.hdfs.trident.format.*;
-import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.state.StateFactory;
-import org.yaml.snakeyaml.Yaml;
-
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Map;
-
-public class TridentFileTopology {
-
- public static StormTopology buildTopology(String hdfsUrl){
- FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", "key"), 1000, new Values("the cow jumped over the moon", 1l),
- new Values("the man went to the store and bought some candy", 2l), new Values("four score and seven years ago", 3l),
- new Values("how many apples can you eat", 4l), new Values("to be or not to be the person", 5l));
- spout.setCycle(true);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- Fields hdfsFields = new Fields("sentence", "key");
-
- FileNameFormat fileNameFormat = new DefaultFileNameFormat()
- .withPath("/tmp/trident")
- .withPrefix("trident")
- .withExtension(".txt");
-
- RecordFormat recordFormat = new DelimitedRecordFormat()
- .withFields(hdfsFields);
-
- FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
-
- HdfsState.Options options = new HdfsState.HdfsFileOptions()
- .withFileNameFormat(fileNameFormat)
- .withRecordFormat(recordFormat)
- .withRotationPolicy(rotationPolicy)
- .withFsUrl(hdfsUrl)
- .withConfigKey("hdfs.config");
-
- StateFactory factory = new HdfsStateFactory().withOptions(options);
-
- TridentState state = stream
- .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
-
- return topology.build();
- }
-
- public static void main(String[] args) throws Exception {
- Config conf = new Config();
- conf.setMaxSpoutPending(5);
-
- Yaml yaml = new Yaml();
- InputStream in = new FileInputStream(args[1]);
- Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
- in.close();
- conf.put("hdfs.config", yamlConf);
-
- if (args.length == 2) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", conf, buildTopology(args[0]));
- Thread.sleep(120 * 1000);
- } else if(args.length == 3) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0]));
- } else{
- System.out.println("Usage: TridentFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
deleted file mode 100644
index 788b33c..0000000
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.hdfs.common.rotation.MoveFileAction;
-import org.apache.storm.hdfs.trident.format.*;
-import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.state.StateFactory;
-import org.yaml.snakeyaml.Yaml;
-
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Map;
-
-public class TridentSequenceTopology {
-
- public static StormTopology buildTopology(String hdfsUrl){
- FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", "key"), 1000, new Values("the cow jumped over the moon", 1l),
- new Values("the man went to the store and bought some candy", 2l), new Values("four score and seven years ago", 3l),
- new Values("how many apples can you eat", 4l), new Values("to be or not to be the person", 5l));
- spout.setCycle(true);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- Fields hdfsFields = new Fields("sentence", "key");
-
- FileNameFormat fileNameFormat = new DefaultFileNameFormat()
- .withPath("/tmp/trident")
- .withPrefix("trident")
- .withExtension(".seq");
-
- FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
-
- HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
- .withFileNameFormat(fileNameFormat)
- .withSequenceFormat(new DefaultSequenceFormat("key", "sentence"))
- .withRotationPolicy(rotationPolicy)
- .withFsUrl(hdfsUrl)
- .withConfigKey("hdfs.config")
- .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/"));
- StateFactory factory = new HdfsStateFactory().withOptions(seqOpts);
-
- TridentState state = stream
- .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
-
- return topology.build();
- }
-
- public static void main(String[] args) throws Exception {
- Config conf = new Config();
- conf.setMaxSpoutPending(5);
-
- Yaml yaml = new Yaml();
- InputStream in = new FileInputStream(args[1]);
- Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
- in.close();
- conf.put("hdfs.config", yamlConf);
-
- if (args.length == 2) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", conf, buildTopology(args[0]));
- Thread.sleep(120 * 1000);
- } else if(args.length == 3) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0]));
- } else{
- System.out.println("Usage: TridentSequenceTopology [hdfs url] [hdfs yaml config file] <topology name>");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
deleted file mode 100644
index 607bd61..0000000
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hive.bolt;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.MockTupleHelpers;
-
-import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
-import org.apache.storm.hive.common.HiveOptions;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class BucketTestHiveTopology {
- static final String USER_SPOUT_ID = "user-spout";
- static final String BOLT_ID = "my-hive-bolt";
- static final String TOPOLOGY_NAME = "hive-test-topology1";
-
- public static void main(String[] args) throws Exception {
- if ((args == null) || (args.length < 7)) {
- System.out.println("Usage: BucketTestHiveTopology metastoreURI "
- + "dbName tableName dataFileLocation hiveBatchSize " +
- "hiveTickTupl]eIntervalSecs workers [topologyNamey] [keytab file]"
- + " [principal name] ");
- System.exit(1);
- }
- String metaStoreURI = args[0];
- String dbName = args[1];
- String tblName = args[2];
- String sourceFileLocation = args[3];
- Integer hiveBatchSize = Integer.parseInt(args[4]);
- Integer hiveTickTupleIntervalSecs = Integer.parseInt(args[5]);
- Integer workers = Integer.parseInt(args[6]);
- String[] colNames = { "ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk",
- "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk",
- "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
- "ss_wholesale_cost", "ss_list_price", "ss_sales_price",
- "ss_ext_discount_amt", "ss_ext_sales_price",
- "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax",
- "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax",
- "ss_net_profit" };
- Config config = new Config();
- config.setNumWorkers(workers);
- UserDataSpout spout = new UserDataSpout().withDataFile(sourceFileLocation);
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames)).withTimeAsPartitionField("yyyy/MM/dd");
- HiveOptions hiveOptions;
- hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
- .withTxnsPerBatch(10)
- .withBatchSize(hiveBatchSize);
- // doing below because its affecting storm metrics most likely
- // had to make tick tuple a mandatory argument since its positional
- if (hiveTickTupleIntervalSecs > 0) {
- hiveOptions.withTickTupleInterval(hiveTickTupleIntervalSecs);
- }
- if (args.length == 10) {
- hiveOptions.withKerberosKeytab(args[8]).withKerberosPrincipal(args[9]);
- }
- HiveBolt hiveBolt = new HiveBolt(hiveOptions);
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(USER_SPOUT_ID, spout, 1);
- // SentenceSpout --> MyBolt
- builder.setBolt(BOLT_ID, hiveBolt, 14)
- .shuffleGrouping(USER_SPOUT_ID);
- if (args.length == 6) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
- waitForSeconds(20);
- cluster.killTopology(TOPOLOGY_NAME);
- System.out.println("cluster begin to shutdown");
- cluster.shutdown();
- System.out.println("cluster shutdown");
- System.exit(0);
- } else {
- StormSubmitter.submitTopology(args[7], config, builder.createTopology());
- }
- }
-
- public static void waitForSeconds(int seconds) {
- try {
- Thread.sleep(seconds * 1000);
- } catch (InterruptedException e) {
- }
- }
-
- public static class UserDataSpout extends BaseRichSpout {
- private ConcurrentHashMap<UUID, Values> pending;
- private SpoutOutputCollector collector;
- private String filePath;
- private BufferedReader br;
- private int count = 0;
- private long total = 0L;
- private String[] outputFields = { "ss_sold_date_sk", "ss_sold_time_sk",
- "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk",
- "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number",
- "ss_quantity", "ss_wholesale_cost", "ss_list_price",
- "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price",
- "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax",
- "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax",
- "ss_net_profit" };
-
- public UserDataSpout withDataFile (String filePath) {
- this.filePath = filePath;
- return this;
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(this.outputFields));
- }
-
- public void open(Map config, TopologyContext context,
- SpoutOutputCollector collector) {
- this.collector = collector;
- this.pending = new ConcurrentHashMap<UUID, Values>();
- try {
- this.br = new BufferedReader(new FileReader(new File(this
- .filePath)));
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
-
- public void nextTuple() {
- String line;
- try {
- if ((line = br.readLine()) != null) {
- System.out.println("*********" + line);
- String[] values = line.split("\\|", -1);
- // above gives an extra empty string at the end. below
- // removes that
- values = Arrays.copyOfRange(values, 0,
- this.outputFields.length);
- Values tupleValues = new Values(values);
- UUID msgId = UUID.randomUUID();
- this.pending.put(msgId, tupleValues);
- this.collector.emit(tupleValues, msgId);
- count++;
- total++;
- if (count > 1000) {
- count = 0;
- System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
- }
- }
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
-
- public void ack(Object msgId) {
- this.pending.remove(msgId);
- }
-
- public void fail(Object msgId) {
- System.out.println("**** RESENDING FAILED TUPLE");
- this.collector.emit(this.pending.get(msgId), msgId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
deleted file mode 100644
index 4afd298..0000000
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hive.bolt;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
-import org.apache.storm.hive.common.HiveOptions;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class HiveTopology {
- static final String USER_SPOUT_ID = "user-spout";
- static final String BOLT_ID = "my-hive-bolt";
- static final String TOPOLOGY_NAME = "hive-test-topology1";
-
- public static void main(String[] args) throws Exception {
- String metaStoreURI = args[0];
- String dbName = args[1];
- String tblName = args[2];
- String[] colNames = {"id","name","phone","street","city","state"};
- Config config = new Config();
- config.setNumWorkers(1);
- UserDataSpout spout = new UserDataSpout();
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames));
- HiveOptions hiveOptions;
- if (args.length == 6) {
- hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
- .withTxnsPerBatch(10)
- .withBatchSize(100)
- .withIdleTimeout(10)
- .withKerberosKeytab(args[4])
- .withKerberosPrincipal(args[5]);
- } else {
- hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
- .withTxnsPerBatch(10)
- .withBatchSize(100)
- .withIdleTimeout(10)
- .withMaxOpenConnections(1);
- }
-
- HiveBolt hiveBolt = new HiveBolt(hiveOptions);
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(USER_SPOUT_ID, spout, 1);
- // SentenceSpout --> MyBolt
- builder.setBolt(BOLT_ID, hiveBolt, 1)
- .shuffleGrouping(USER_SPOUT_ID);
- if (args.length == 3) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
- waitForSeconds(20);
- cluster.killTopology(TOPOLOGY_NAME);
- System.out.println("cluster begin to shutdown");
- cluster.shutdown();
- System.out.println("cluster shutdown");
- System.exit(0);
- } else if(args.length >= 4) {
- StormSubmitter.submitTopology(args[3], config, builder.createTopology());
- } else {
- System.out.println("Usage: HiveTopology metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
- }
- }
-
- public static void waitForSeconds(int seconds) {
- try {
- Thread.sleep(seconds * 1000);
- } catch (InterruptedException e) {
- }
- }
-
- public static class UserDataSpout extends BaseRichSpout {
- private ConcurrentHashMap<UUID, Values> pending;
- private SpoutOutputCollector collector;
- private String[] sentences = {
- "1,user1,123456,street1,sunnyvale,ca",
- "2,user2,123456,street2,sunnyvale,ca",
- "3,user3,123456,street3,san jose,ca",
- "4,user4,123456,street4,san jose,ca",
- };
- private int index = 0;
- private int count = 0;
- private long total = 0L;
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id","name","phone","street","city","state"));
- }
-
- public void open(Map config, TopologyContext context,
- SpoutOutputCollector collector) {
- this.collector = collector;
- this.pending = new ConcurrentHashMap<UUID, Values>();
- }
-
- public void nextTuple() {
- String[] user = sentences[index].split(",");
- Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
- UUID msgId = UUID.randomUUID();
- this.pending.put(msgId, values);
- this.collector.emit(values, msgId);
- index++;
- if (index >= sentences.length) {
- index = 0;
- }
- count++;
- total++;
- if(count > 1000){
- count = 0;
- System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
- }
- Thread.yield();
- }
-
- public void ack(Object msgId) {
- this.pending.remove(msgId);
- }
-
- public void fail(Object msgId) {
- System.out.println("**** RESENDING FAILED TUPLE");
- this.collector.emit(this.pending.get(msgId), msgId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
deleted file mode 100644
index a52c490..0000000
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hive.bolt;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
-import org.apache.storm.hive.common.HiveOptions;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class HiveTopologyPartitioned {
- static final String USER_SPOUT_ID = "hive-user-spout-partitioned";
- static final String BOLT_ID = "my-hive-bolt-partitioned";
- static final String TOPOLOGY_NAME = "hive-test-topology-partitioned";
-
- public static void main(String[] args) throws Exception {
- String metaStoreURI = args[0];
- String dbName = args[1];
- String tblName = args[2];
- String[] partNames = {"city","state"};
- String[] colNames = {"id","name","phone","street"};
- Config config = new Config();
- config.setNumWorkers(1);
- UserDataSpout spout = new UserDataSpout();
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames))
- .withPartitionFields(new Fields(partNames));
- HiveOptions hiveOptions;
- if (args.length == 6) {
- hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
- .withTxnsPerBatch(10)
- .withBatchSize(1000)
- .withIdleTimeout(10)
- .withKerberosKeytab(args[4])
- .withKerberosPrincipal(args[5]);
- } else {
- hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
- .withTxnsPerBatch(10)
- .withBatchSize(1000)
- .withIdleTimeout(10);
- }
-
- HiveBolt hiveBolt = new HiveBolt(hiveOptions);
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(USER_SPOUT_ID, spout, 1);
- // SentenceSpout --> MyBolt
- builder.setBolt(BOLT_ID, hiveBolt, 1)
- .shuffleGrouping(USER_SPOUT_ID);
- if (args.length == 3) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
- waitForSeconds(20);
- cluster.killTopology(TOPOLOGY_NAME);
- System.out.println("cluster begin to shutdown");
- cluster.shutdown();
- System.out.println("cluster shutdown");
- System.exit(0);
- } else if(args.length >= 4) {
- StormSubmitter.submitTopology(args[3], config, builder.createTopology());
- } else {
- System.out.println("Usage: HiveTopologyPartitioned metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
- }
- }
-
- public static void waitForSeconds(int seconds) {
- try {
- Thread.sleep(seconds * 1000);
- } catch (InterruptedException e) {
- }
- }
-
- public static class UserDataSpout extends BaseRichSpout {
- private ConcurrentHashMap<UUID, Values> pending;
- private SpoutOutputCollector collector;
- private String[] sentences = {
- "1,user1,123456,street1,sunnyvale,ca",
- "2,user2,123456,street2,sunnyvale,ca",
- "3,user3,123456,street3,san jose,ca",
- "4,user4,123456,street4,san jose,ca",
- };
- private int index = 0;
- private int count = 0;
- private long total = 0L;
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id","name","phone","street","city","state"));
- }
-
- public void open(Map config, TopologyContext context,
- SpoutOutputCollector collector) {
- this.collector = collector;
- this.pending = new ConcurrentHashMap<UUID, Values>();
- }
-
- public void nextTuple() {
- String[] user = sentences[index].split(",");
- Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
- UUID msgId = UUID.randomUUID();
- this.pending.put(msgId, values);
- this.collector.emit(values, msgId);
- index++;
- if (index >= sentences.length) {
- index = 0;
- }
- count++;
- total++;
- if(count > 1000){
- Utils.sleep(1000);
- count = 0;
- System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
- }
- }
-
- public void ack(Object msgId) {
- this.pending.remove(msgId);
- }
-
- public void fail(Object msgId) {
- System.out.println("**** RESENDING FAILED TUPLE");
- this.collector.emit(this.pending.get(msgId), msgId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
deleted file mode 100644
index 86a35e6..0000000
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hive.trident;
-
-
-import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
-import org.apache.storm.hive.common.HiveOptions;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.hooks.SubmitterHookException;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.state.StateFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class TridentHiveTopology {
- private static final Logger LOG = LoggerFactory.getLogger(TridentHiveTopology.class);
-
- public static StormTopology buildTopology(String metaStoreURI, String dbName, String tblName, Object keytab, Object principal) {
- int batchSize = 100;
- FixedBatchSpout spout = new FixedBatchSpout(batchSize);
- spout.setCycle(true);
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("hiveTridentspout1",spout);
- String[] partNames = {"city","state"};
- String[] colNames = {"id","name","phone","street"};
- Fields hiveFields = new Fields("id","name","phone","street","city","state");
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames))
- .withPartitionFields(new Fields(partNames));
- HiveOptions hiveOptions;
- if (keytab != null && principal != null) {
- hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
- .withTxnsPerBatch(10)
- .withBatchSize(batchSize)
- .withIdleTimeout(10)
- .withCallTimeout(30000)
- .withKerberosKeytab((String)keytab)
- .withKerberosPrincipal((String)principal);
- } else {
- hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
- .withTxnsPerBatch(10)
- .withBatchSize(batchSize)
- .withCallTimeout(30000)
- .withIdleTimeout(10);
- }
- StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
- TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
- return topology.build();
- }
-
- public static void waitForSeconds(int seconds) {
- try {
- Thread.sleep(seconds * 1000);
- } catch (InterruptedException e) {
- }
- }
-
- public static void main(String[] args) {
- String metaStoreURI = args[0];
- String dbName = args[1];
- String tblName = args[2];
- Config conf = new Config();
- conf.setMaxSpoutPending(5);
- if(args.length == 3) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("tridentHiveTopology", conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
- LOG.info("waiting for 60 seconds");
- waitForSeconds(60);
- LOG.info("killing topology");
- cluster.killTopology("tridenHiveTopology");
- LOG.info("cluster shutdown");
- cluster.shutdown();
- LOG.info("cluster shutdown");
- System.exit(0);
- } else if(args.length == 4) {
- try {
- StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
- } catch(SubmitterHookException e) {
- LOG.warn("Topology is submitted but invoking ISubmitterHook failed", e);
- } catch (Exception e) {
- LOG.warn("Failed to submit topology ", e);
- }
- } else if (args.length == 6) {
- try {
- StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,args[4],args[5]));
- } catch(SubmitterHookException e) {
- LOG.warn("Topology is submitted but invoking ISubmitterHook failed", e);
- } catch (Exception e) {
- LOG.warn("Failed to submit topology ", e);
- }
- } else {
- LOG.info("Usage: TridentHiveTopology metastoreURI dbName tableName [topologyNamey]");
- }
- }
-
- public static class FixedBatchSpout implements IBatchSpout {
- int maxBatchSize;
- HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
- private Values[] outputs = {
- new Values("1","user1","123456","street1","sunnyvale","ca"),
- new Values("2","user2","123456","street2","sunnyvale","ca"),
- new Values("3","user3","123456","street3","san jose","ca"),
- new Values("4","user4","123456","street4","san jose","ca"),
- };
- private int index = 0;
- boolean cycle = false;
-
- public FixedBatchSpout(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
- }
-
- public void setCycle(boolean cycle) {
- this.cycle = cycle;
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields("id","name","phone","street","city","state");
- }
-
- @Override
- public void open(Map conf, TopologyContext context) {
- index = 0;
- }
-
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- List<List<Object>> batch = this.batches.get(batchId);
- if(batch == null){
- batch = new ArrayList<List<Object>>();
- if(index>=outputs.length && cycle) {
- index = 0;
- }
- for(int i=0; i < maxBatchSize; index++, i++) {
- if(index == outputs.length){
- index=0;
- }
- batch.add(outputs[index]);
- }
- this.batches.put(batchId, batch);
- }
- for(List<Object> list : batch){
- collector.emit(list);
- }
- }
-
- @Override
- public void ack(long batchId) {
- this.batches.remove(batchId);
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public Map getComponentConfiguration() {
- Config conf = new Config();
- conf.setMaxTaskParallelism(1);
- return conf;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
deleted file mode 100644
index fdcd053..0000000
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jdbc.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import com.google.common.collect.Lists;
-
-import java.util.*;
-
-public class UserSpout implements IRichSpout {
- boolean isDistributed;
- SpoutOutputCollector collector;
- public static final List<Values> rows = Lists.newArrayList(
- new Values(1,"peter",System.currentTimeMillis()),
- new Values(2,"bob",System.currentTimeMillis()),
- new Values(3,"alice",System.currentTimeMillis()));
-
- public UserSpout() {
- this(true);
- }
-
- public UserSpout(boolean isDistributed) {
- this.isDistributed = isDistributed;
- }
-
- public boolean isDistributed() {
- return this.isDistributed;
- }
-
- @SuppressWarnings("rawtypes")
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- }
-
- public void close() {
-
- }
-
- public void nextTuple() {
- final Random rand = new Random();
- final Values row = rows.get(rand.nextInt(rows.size() - 1));
- this.collector.emit(row);
- Thread.yield();
- }
-
- public void ack(Object msgId) {
-
- }
-
- public void fail(Object msgId) {
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("user_id","user_name","create_date"));
- }
-
- @Override
- public void activate() {
- }
-
- @Override
- public void deactivate() {
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
deleted file mode 100644
index ec7ca36..0000000
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jdbc.topology;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.common.ConnectionProvider;
-import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
-import org.apache.storm.jdbc.common.JdbcClient;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
-import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
-import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
-import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
-import org.apache.storm.jdbc.spout.UserSpout;
-import org.apache.storm.LocalCluster;
-
-import java.sql.Types;
-import java.util.List;
-import java.util.Map;
-
-public abstract class AbstractUserTopology {
- private static final List<String> setupSqls = Lists.newArrayList(
- "drop table if exists user",
- "drop table if exists department",
- "drop table if exists user_department",
- "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
- "create table if not exists department (dept_id integer, dept_name varchar(100))",
- "create table if not exists user_department (user_id integer, dept_id integer)",
- "insert into department values (1, 'R&D')",
- "insert into department values (2, 'Finance')",
- "insert into department values (3, 'HR')",
- "insert into department values (4, 'Sales')",
- "insert into user_department values (1, 1)",
- "insert into user_department values (2, 2)",
- "insert into user_department values (3, 3)",
- "insert into user_department values (4, 4)"
- );
- protected UserSpout userSpout;
- protected JdbcMapper jdbcMapper;
- protected JdbcLookupMapper jdbcLookupMapper;
- protected ConnectionProvider connectionProvider;
-
- protected static final String TABLE_NAME = "user";
- protected static final String JDBC_CONF = "jdbc.conf";
- protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
- " and user_department.user_id = ?";
-
- public void execute(String[] args) throws Exception {
- if (args.length != 4 && args.length != 5) {
- System.out.println("Usage: " + this.getClass().getSimpleName() + " <dataSourceClassName> <dataSource.url> "
- + "<user> <password> [topology name]");
- System.exit(-1);
- }
- Map map = Maps.newHashMap();
- map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
- map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
- map.put("dataSource.user", args[2]);//root
-
- if(args.length == 4) {
- map.put("dataSource.password", args[3]);//password
- }
-
- Config config = new Config();
- config.put(JDBC_CONF, map);
-
- ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
- connectionProvider.prepare();
- int queryTimeoutSecs = 60;
- JdbcClient jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
- for (String sql : setupSqls) {
- jdbcClient.executeSql(sql);
- }
-
- this.userSpout = new UserSpout();
- this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, connectionProvider);
- connectionProvider.cleanup();
- Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
- List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
- this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
- this.connectionProvider = new HikariCPConnectionProvider(map);
- if (args.length == 4) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", config, getTopology());
- Thread.sleep(30000);
- cluster.killTopology("test");
- cluster.shutdown();
- System.exit(0);
- } else {
- StormSubmitter.submitTopology(args[4], config, getTopology());
- }
- }
-
- public abstract StormTopology getTopology();
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
deleted file mode 100644
index 1915219..0000000
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jdbc.topology;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-import com.google.common.collect.Lists;
-import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
-import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
-import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
-import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
-
-import java.sql.Types;
-import java.util.List;
-
-
-public class UserPersistanceTopology extends AbstractUserTopology {
- private static final String USER_SPOUT = "USER_SPOUT";
- private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
- private static final String PERSISTANCE_BOLT = "PERSISTANCE_BOLT";
-
- public static void main(String[] args) throws Exception {
- new UserPersistanceTopology().execute(args);
- }
-
- @Override
- public StormTopology getTopology() {
- JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(connectionProvider, SELECT_QUERY, this.jdbcLookupMapper);
-
- //must specify column schema when providing custom query.
- List<Column> schemaColumns = Lists.newArrayList(new Column("create_date", Types.DATE),
- new Column("dept_name", Types.VARCHAR), new Column("user_id", Types.INTEGER), new Column("user_name", Types.VARCHAR));
- JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
-
- JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, mapper)
- .withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)");
-
- // userSpout ==> jdbcBolt
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout(USER_SPOUT, this.userSpout, 1);
- builder.setBolt(LOOKUP_BOLT, departmentLookupBolt, 1).shuffleGrouping(USER_SPOUT);
- builder.setBolt(PERSISTANCE_BOLT, userPersistanceBolt, 1).shuffleGrouping(LOOKUP_BOLT);
- return builder.createTopology();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
deleted file mode 100644
index 11269c3..0000000
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jdbc.topology;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import com.google.common.collect.Lists;
-import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
-import org.apache.storm.jdbc.spout.UserSpout;
-import org.apache.storm.jdbc.trident.state.JdbcQuery;
-import org.apache.storm.jdbc.trident.state.JdbcState;
-import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
-import org.apache.storm.jdbc.trident.state.JdbcUpdater;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-
-import java.sql.Types;
-
-public class UserPersistanceTridentTopology extends AbstractUserTopology {
-
- public static void main(String[] args) throws Exception {
- new UserPersistanceTridentTopology().execute(args);
- }
-
- @Override
- public StormTopology getTopology() {
- TridentTopology topology = new TridentTopology();
-
- JdbcState.Options options = new JdbcState.Options()
- .withConnectionProvider(connectionProvider)
- .withMapper(this.jdbcMapper)
- .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
- .withTableName(TABLE_NAME)
- .withSelectQuery(SELECT_QUERY);
-
- JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
-
- Stream stream = topology.newStream("userSpout", new UserSpout());
- TridentState state = topology.newStaticState(jdbcStateFactory);
- stream = stream.stateQuery(state, new Fields("user_id","user_name","create_date"), new JdbcQuery(), new Fields("dept_name"));
- stream.partitionPersist(jdbcStateFactory, new Fields("user_id","user_name","dept_name","create_date"), new JdbcUpdater(), new Fields());
- return topology.build();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java
deleted file mode 100644
index fdc6752..0000000
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import com.google.common.collect.ImmutableMap;
-import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
-import org.apache.storm.kafka.trident.TridentKafkaUpdater;
-import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
-import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-
-import java.util.Properties;
-
-public class TridentKafkaTopology {
-
- private static StormTopology buildTopology(String brokerConnectionString) {
- Fields fields = new Fields("word", "count");
- FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
- new Values("storm", "1"),
- new Values("trident", "1"),
- new Values("needs", "1"),
- new Values("javadoc", "1")
- );
- spout.setCycle(true);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- Properties props = new Properties();
- props.put("bootstrap.servers", brokerConnectionString);
- props.put("acks", "1");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
- .withProducerProperties(props)
- .withKafkaTopicSelector(new DefaultTopicSelector("test"))
- .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
- stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
-
- return topology.build();
- }
-
- /**
- * To run this topology ensure you have a kafka broker running and provide connection string to broker as argument.
- * Create a topic test with command line,
- * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test
- *
- * run this program and run the kafka consumer:
- * kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
- *
- * you should see the messages flowing through.
- *
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- if(args.length < 1) {
- System.out.println("Please provide kafka broker url ,e.g. localhost:9092");
- }
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", new Config(), buildTopology(args[0]));
- Thread.sleep(60 * 1000);
- cluster.killTopology("wordCounter");
-
- cluster.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java
deleted file mode 100644
index c83bdbd..0000000
--- a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mongodb.topology;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.mongodb.bolt.MongoInsertBolt;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class InsertWordCount {
- private static final String WORD_SPOUT = "WORD_SPOUT";
- private static final String COUNT_BOLT = "COUNT_BOLT";
- private static final String INSERT_BOLT = "INSERT_BOLT";
-
- private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test";
- private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
-
-
- public static void main(String[] args) throws Exception {
- Config config = new Config();
-
- String url = TEST_MONGODB_URL;
- String collectionName = TEST_MONGODB_COLLECTION_NAME;
-
- if (args.length >= 2) {
- url = args[0];
- collectionName = args[1];
- }
-
- WordSpout spout = new WordSpout();
- WordCounter bolt = new WordCounter();
-
- MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
- MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
-
- // wordSpout ==> countBolt ==> MongoInsertBolt
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout(WORD_SPOUT, spout, 1);
- builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
- builder.setBolt(INSERT_BOLT, insertBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
-
-
- if (args.length == 2) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", config, builder.createTopology());
- Thread.sleep(30000);
- cluster.killTopology("test");
- cluster.shutdown();
- System.exit(0);
- } else if (args.length == 3) {
- StormSubmitter.submitTopology(args[2], config, builder.createTopology());
- } else{
- System.out.println("Usage: InsertWordCount <mongodb url> <mongodb collection> [topology name]");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
deleted file mode 100644
index 071708e..0000000
--- a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mongodb.topology;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.mongodb.bolt.MongoInsertBolt;
-import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class UpdateWordCount {
- private static final String WORD_SPOUT = "WORD_SPOUT";
- private static final String COUNT_BOLT = "COUNT_BOLT";
- private static final String UPDATE_BOLT = "UPDATE_BOLT";
-
- private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test";
- private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
-
-
- public static void main(String[] args) throws Exception {
- Config config = new Config();
-
- String url = TEST_MONGODB_URL;
- String collectionName = TEST_MONGODB_COLLECTION_NAME;
-
- if (args.length >= 2) {
- url = args[0];
- collectionName = args[1];
- }
-
- WordSpout spout = new WordSpout();
- WordCounter bolt = new WordCounter();
-
- MongoMapper mapper = new SimpleMongoUpdateMapper()
- .withFields("word", "count");
-
- QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
- .withField("word");
-
- MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator , mapper);
-
- //if a new document should be inserted if there are no matches to the query filter
- //updateBolt.withUpsert(true);
-
- // wordSpout ==> countBolt ==> MongoUpdateBolt
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout(WORD_SPOUT, spout, 1);
- builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
- builder.setBolt(UPDATE_BOLT, updateBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
-
-
- if (args.length == 2) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", config, builder.createTopology());
- Thread.sleep(30000);
- cluster.killTopology("test");
- cluster.shutdown();
- System.exit(0);
- } else if (args.length == 3) {
- StormSubmitter.submitTopology(args[2], config, builder.createTopology());
- } else{
- System.out.println("Usage: UpdateWordCount <mongodb url> <mongodb collection> [topology name]");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java
deleted file mode 100644
index 481f959..0000000
--- a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mongodb.topology;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import com.google.common.collect.Maps;
-
-import java.util.Map;
-
-import static org.apache.storm.utils.Utils.tuple;
-
-public class WordCounter implements IBasicBolt {
- private Map<String, Integer> wordCounter = Maps.newHashMap();
-
- public void prepare(Map stormConf, TopologyContext context) {
-
- }
-
- public void execute(Tuple input, BasicOutputCollector collector) {
- String word = input.getStringByField("word");
- int count;
- if (wordCounter.containsKey(word)) {
- count = wordCounter.get(word) + 1;
- wordCounter.put(word, wordCounter.get(word) + 1);
- } else {
- count = 1;
- }
-
- wordCounter.put(word, count);
- collector.emit(new Values(word, String.valueOf(count)));
- }
-
- public void cleanup() {
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java
deleted file mode 100644
index 284f228..0000000
--- a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mongodb.topology;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-
-public class WordSpout implements IRichSpout {
- boolean isDistributed;
- SpoutOutputCollector collector;
- public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" };
-
- public WordSpout() {
- this(true);
- }
-
- public WordSpout(boolean isDistributed) {
- this.isDistributed = isDistributed;
- }
-
- public boolean isDistributed() {
- return this.isDistributed;
- }
-
- @SuppressWarnings("rawtypes")
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- }
-
- public void close() {
-
- }
-
- public void nextTuple() {
- final Random rand = new Random();
- final String word = words[rand.nextInt(words.length)];
- this.collector.emit(new Values(word), UUID.randomUUID());
- Thread.yield();
- }
-
- public void ack(Object msgId) {
-
- }
-
- public void fail(Object msgId) {
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
- @Override
- public void activate() {
- }
-
- @Override
- public void deactivate() {
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java
deleted file mode 100644
index 7a18863..0000000
--- a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.mongodb.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
-import org.apache.storm.mongodb.trident.state.MongoState;
-import org.apache.storm.mongodb.trident.state.MongoStateFactory;
-import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class WordCountTrident {
-
- public static StormTopology buildTopology(String url, String collectionName){
- Fields fields = new Fields("word", "count");
- FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
- new Values("storm", 1),
- new Values("trident", 1),
- new Values("needs", 1),
- new Values("javadoc", 1)
- );
- spout.setCycle(true);
-
- MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
- MongoState.Options options = new MongoState.Options()
- .withUrl(url)
- .withCollectionName(collectionName)
- .withMapper(mapper);
-
- StateFactory factory = new MongoStateFactory(options);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields());
- return topology.build();
- }
-
- public static void main(String[] args) throws Exception {
- Config conf = new Config();
- conf.setMaxSpoutPending(5);
- if (args.length == 2) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));
- Thread.sleep(60 * 1000);
- cluster.killTopology("wordCounter");
- cluster.shutdown();
- System.exit(0);
- }
- else if(args.length == 3) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
- } else{
- System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-mqtt/examples/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/examples/pom.xml b/external/storm-mqtt/examples/pom.xml
deleted file mode 100644
index 3b152ae..0000000
--- a/external/storm-mqtt/examples/pom.xml
+++ /dev/null
@@ -1,115 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>storm-mqtt-examples</artifactId>
- <packaging>jar</packaging>
-
- <name>storm-mqtt-examples</name>
-
- <parent>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-mqtt-parent</artifactId>
- <version>1.1.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-mqtt</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <version>2.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>${provided.scope}</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>flux-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.fusesource.mqtt-client</groupId>
- <artifactId>mqtt-client</artifactId>
- <version>1.10</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-broker</artifactId>
- <version>5.9.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-mqtt</artifactId>
- <version>5.9.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-kahadb-store</artifactId>
- <version>5.9.0</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>1.4</version>
- <configuration>
- <createDependencyReducedPom>true</createDependencyReducedPom>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.storm.flux.Flux</mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-mqtt/examples/src/main/flux/sample.yaml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/examples/src/main/flux/sample.yaml b/external/storm-mqtt/examples/src/main/flux/sample.yaml
deleted file mode 100644
index c2902dc..0000000
--- a/external/storm-mqtt/examples/src/main/flux/sample.yaml
+++ /dev/null
@@ -1,62 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
----
-
-# topology definition
-# name to be used when submitting
-name: "mqtt-topology"
-
-components:
- ########## MQTT Spout Config ############
- - id: "mqtt-type"
- className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
-
- - id: "mqtt-options"
- className: "org.apache.storm.mqtt.common.MqttOptions"
- properties:
- - name: "url"
- value: "tcp://localhost:1883"
- - name: "topics"
- value:
- - "/users/tgoetz/#"
-
-# topology configuration
-config:
- topology.workers: 1
- topology.max.spout.pending: 1000
-
-# spout definitions
-spouts:
- - id: "mqtt-spout"
- className: "org.apache.storm.mqtt.spout.MqttSpout"
- constructorArgs:
- - ref: "mqtt-type"
- - ref: "mqtt-options"
- parallelism: 1
-
-# bolt definitions
-bolts:
- - id: "log"
- className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
- parallelism: 1
-
-
-streams:
- - from: "mqtt-spout"
- to: "log"
- grouping:
- type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/external/storm-mqtt/examples/src/main/flux/ssl-sample.yaml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/examples/src/main/flux/ssl-sample.yaml b/external/storm-mqtt/examples/src/main/flux/ssl-sample.yaml
deleted file mode 100644
index bfb668d..0000000
--- a/external/storm-mqtt/examples/src/main/flux/ssl-sample.yaml
+++ /dev/null
@@ -1,78 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
----
-
-# topology definition
-# name to be used when submitting
-name: "mqtt-topology"
-
-components:
- ########## MQTT Spout Config ############
- - id: "mqtt-type"
- className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
-
- - id: "keystore-loader"
- className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
- constructorArgs:
- - "keystore.jks"
- - "truststore.jks"
- properties:
- - name: "keyPassword"
- value: "password"
- - name: "keyStorePassword"
- value: "password"
- - name: "trustStorePassword"
- value: "password"
-
- - id: "mqtt-options"
- className: "org.apache.storm.mqtt.common.MqttOptions"
- properties:
- - name: "url"
- value: "ssl://raspberrypi.local:8883"
- - name: "topics"
- value:
- - "/users/tgoetz/#"
-
-# topology configuration
-config:
- topology.workers: 1
- topology.max.spout.pending: 1000
-
-# spout definitions
-spouts:
- - id: "mqtt-spout"
- className: "org.apache.storm.mqtt.spout.MqttSpout"
- constructorArgs:
- - ref: "mqtt-type"
- - ref: "mqtt-options"
- - ref: "keystore-loader"
- parallelism: 1
-
-# bolt definitions
-bolts:
-
- - id: "log"
- className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
- parallelism: 1
-
-
-streams:
-
- - from: "mqtt-spout"
- to: "log"
- grouping:
- type: SHUFFLE