You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/12 04:16:12 UTC
[07/10] storm git commit: STORM-1970: external project examples
refator
STORM-1970: external project examples refator
* resolve conflict by Jungtaek Lim (kabhwan@gmail.com)
* resolve version mismatch and some external modules after added
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/97fe209e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/97fe209e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/97fe209e
Branch: refs/heads/1.x-branch
Commit: 97fe209ee6c7d2abe6807732e03baa8950c768ea
Parents: bc0a1b8
Author: vesense <be...@163.com>
Authored: Thu Jul 14 17:13:39 2016 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Sep 12 13:09:35 2016 +0900
----------------------------------------------------------------------
examples/storm-elasticsearch-examples/pom.xml | 44 ++++
.../elasticsearch/bolt/EsIndexTopology.java | 120 +++++++++++
.../storm/elasticsearch/common/EsConstants.java | 22 ++
.../storm/elasticsearch/common/EsTestUtil.java | 75 +++++++
.../trident/TridentEsTopology.java | 135 +++++++++++++
examples/storm-hbase-examples/pom.xml | 44 ++++
.../storm/hbase/topology/LookupWordCount.java | 79 ++++++++
.../hbase/topology/PersistentWordCount.java | 91 +++++++++
.../storm/hbase/topology/TotalWordCounter.java | 70 +++++++
.../storm/hbase/topology/WordCountClient.java | 57 ++++++
.../hbase/topology/WordCountValueMapper.java | 70 +++++++
.../storm/hbase/topology/WordCounter.java | 59 ++++++
.../apache/storm/hbase/topology/WordSpout.java | 88 ++++++++
.../storm/hbase/trident/PrintFunction.java | 40 ++++
.../storm/hbase/trident/WordCountTrident.java | 104 ++++++++++
examples/storm-hdfs-examples/pom.xml | 44 ++++
.../storm/hdfs/bolt/HdfsFileTopology.java | 196 ++++++++++++++++++
.../storm/hdfs/bolt/SequenceFileTopology.java | 202 +++++++++++++++++++
.../storm/hdfs/trident/FixedBatchSpout.java | 97 +++++++++
.../storm/hdfs/trident/TridentFileTopology.java | 99 +++++++++
.../hdfs/trident/TridentSequenceTopology.java | 96 +++++++++
examples/storm-hive-examples/pom.xml | 44 ++++
.../storm/hive/bolt/BucketTestHiveTopology.java | 189 +++++++++++++++++
.../apache/storm/hive/bolt/HiveTopology.java | 151 ++++++++++++++
.../hive/bolt/HiveTopologyPartitioned.java | 153 ++++++++++++++
.../storm/hive/trident/TridentHiveTopology.java | 199 ++++++++++++++++++
examples/storm-jdbc-examples/pom.xml | 44 ++++
.../org/apache/storm/jdbc/spout/UserSpout.java | 90 +++++++++
.../jdbc/topology/AbstractUserTopology.java | 115 +++++++++++
.../jdbc/topology/UserPersistanceTopology.java | 62 ++++++
.../UserPersistanceTridentTopology.java | 61 ++++++
examples/storm-kafka-examples/pom.xml | 44 ++++
.../storm/kafka/TridentKafkaTopology.java | 91 +++++++++
examples/storm-mongodb-examples/pom.xml | 44 ++++
.../storm/mongodb/topology/InsertWordCount.java | 81 ++++++++
.../storm/mongodb/topology/UpdateWordCount.java | 91 +++++++++
.../storm/mongodb/topology/WordCounter.java | 67 ++++++
.../storm/mongodb/topology/WordSpout.java | 88 ++++++++
.../storm/mongodb/trident/WordCountTrident.java | 85 ++++++++
examples/storm-mqtt-examples/pom.xml | 115 +++++++++++
.../src/main/flux/sample.yaml | 62 ++++++
.../src/main/flux/ssl-sample.yaml | 78 +++++++
.../mqtt/examples/CustomMessageMapper.java | 49 +++++
.../mqtt/examples/MqttBrokerPublisher.java | 102 ++++++++++
.../src/main/resources/log4j2.xml | 32 +++
examples/storm-opentsdb-examples/pom.xml | 44 ++++
.../storm/opentsdb/MetricGenBatchSpout.java | 94 +++++++++
.../apache/storm/opentsdb/MetricGenSpout.java | 72 +++++++
.../opentsdb/SampleOpenTsdbBoltTopology.java | 70 +++++++
.../opentsdb/SampleOpenTsdbTridentTopology.java | 87 ++++++++
examples/storm-redis-examples/pom.xml | 44 ++++
.../storm/redis/topology/LookupWordCount.java | 166 +++++++++++++++
.../redis/topology/PersistentWordCount.java | 116 +++++++++++
.../redis/topology/WhitelistWordCount.java | 155 ++++++++++++++
.../storm/redis/topology/WordCounter.java | 67 ++++++
.../apache/storm/redis/topology/WordSpout.java | 88 ++++++++
.../storm/redis/trident/PrintFunction.java | 40 ++++
.../redis/trident/WordCountLookupMapper.java | 57 ++++++
.../redis/trident/WordCountStoreMapper.java | 39 ++++
.../redis/trident/WordCountTridentRedis.java | 98 +++++++++
.../trident/WordCountTridentRedisCluster.java | 106 ++++++++++
.../WordCountTridentRedisClusterMap.java | 101 ++++++++++
.../redis/trident/WordCountTridentRedisMap.java | 94 +++++++++
examples/storm-solr-examples/pom.xml | 44 ++++
.../storm/solr/spout/SolrFieldsSpout.java | 76 +++++++
.../apache/storm/solr/spout/SolrJsonSpout.java | 116 +++++++++++
.../storm/solr/topology/SolrFieldsTopology.java | 56 +++++
.../storm/solr/topology/SolrJsonTopology.java | 48 +++++
.../storm/solr/topology/SolrTopology.java | 82 ++++++++
.../solr/trident/SolrFieldsTridentTopology.java | 45 +++++
.../solr/trident/SolrJsonTridentTopology.java | 45 +++++
.../org/apache/storm/solr/util/TestUtil.java | 30 +++
.../elasticsearch/bolt/EsIndexTopology.java | 120 -----------
.../trident/TridentEsTopology.java | 135 -------------
.../storm/hbase/topology/LookupWordCount.java | 79 --------
.../hbase/topology/PersistentWordCount.java | 91 ---------
.../storm/hbase/topology/TotalWordCounter.java | 70 -------
.../storm/hbase/topology/WordCountClient.java | 57 ------
.../hbase/topology/WordCountValueMapper.java | 70 -------
.../storm/hbase/topology/WordCounter.java | 59 ------
.../apache/storm/hbase/topology/WordSpout.java | 88 --------
.../storm/hbase/trident/PrintFunction.java | 40 ----
.../storm/hbase/trident/WordCountTrident.java | 104 ----------
.../storm/hdfs/bolt/HdfsFileTopology.java | 196 ------------------
.../storm/hdfs/bolt/SequenceFileTopology.java | 202 -------------------
.../storm/hdfs/trident/FixedBatchSpout.java | 97 ---------
.../storm/hdfs/trident/TridentFileTopology.java | 99 ---------
.../hdfs/trident/TridentSequenceTopology.java | 96 ---------
.../storm/hive/bolt/BucketTestHiveTopology.java | 190 -----------------
.../apache/storm/hive/bolt/HiveTopology.java | 151 --------------
.../hive/bolt/HiveTopologyPartitioned.java | 153 --------------
.../storm/hive/trident/TridentHiveTopology.java | 199 ------------------
.../org/apache/storm/jdbc/spout/UserSpout.java | 90 ---------
.../jdbc/topology/AbstractUserTopology.java | 115 -----------
.../jdbc/topology/UserPersistanceTopology.java | 62 ------
.../UserPersistanceTridentTopology.java | 61 ------
.../storm/kafka/TridentKafkaTopology.java | 91 ---------
.../storm/mongodb/topology/InsertWordCount.java | 81 --------
.../storm/mongodb/topology/UpdateWordCount.java | 91 ---------
.../storm/mongodb/topology/WordCounter.java | 67 ------
.../storm/mongodb/topology/WordSpout.java | 88 --------
.../storm/mongodb/trident/WordCountTrident.java | 85 --------
external/storm-mqtt/examples/pom.xml | 115 -----------
.../examples/src/main/flux/sample.yaml | 62 ------
.../examples/src/main/flux/ssl-sample.yaml | 78 -------
.../mqtt/examples/CustomMessageMapper.java | 49 -----
.../mqtt/examples/MqttBrokerPublisher.java | 102 ----------
.../examples/src/main/resources/log4j2.xml | 32 ---
external/storm-mqtt/pom.xml | 1 -
.../storm/opentsdb/MetricGenBatchSpout.java | 94 ---------
.../apache/storm/opentsdb/MetricGenSpout.java | 72 -------
.../opentsdb/SampleOpenTsdbBoltTopology.java | 70 -------
.../opentsdb/SampleOpenTsdbTridentTopology.java | 87 --------
.../storm/redis/topology/LookupWordCount.java | 166 ---------------
.../redis/topology/PersistentWordCount.java | 116 -----------
.../redis/topology/WhitelistWordCount.java | 155 --------------
.../storm/redis/topology/WordCounter.java | 67 ------
.../apache/storm/redis/topology/WordSpout.java | 88 --------
.../storm/redis/trident/PrintFunction.java | 40 ----
.../redis/trident/WordCountLookupMapper.java | 57 ------
.../redis/trident/WordCountStoreMapper.java | 39 ----
.../redis/trident/WordCountTridentRedis.java | 98 ---------
.../trident/WordCountTridentRedisCluster.java | 106 ----------
.../WordCountTridentRedisClusterMap.java | 101 ----------
.../redis/trident/WordCountTridentRedisMap.java | 94 ---------
.../storm/solr/spout/SolrFieldsSpout.java | 76 -------
.../apache/storm/solr/spout/SolrJsonSpout.java | 120 -----------
.../storm/solr/topology/SolrFieldsTopology.java | 56 -----
.../storm/solr/topology/SolrJsonTopology.java | 48 -----
.../storm/solr/topology/SolrTopology.java | 82 --------
.../solr/trident/SolrFieldsTridentTopology.java | 45 -----
.../solr/trident/SolrJsonTridentTopology.java | 45 -----
.../org/apache/storm/solr/util/TestUtil.java | 30 ---
pom.xml | 23 ++-
134 files changed, 6068 insertions(+), 5522 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-elasticsearch-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/pom.xml b/examples/storm-elasticsearch-examples/pom.xml
new file mode 100644
index 0000000..eceb196
--- /dev/null
+++ b/examples/storm-elasticsearch-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-elasticsearch-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-elasticsearch</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
new file mode 100644
index 0000000..d30424b
--- /dev/null
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsConstants;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class EsIndexTopology {
+
+ static final String SPOUT_ID = "spout";
+ static final String BOLT_ID = "bolt";
+ static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+ config.setNumWorkers(1);
+ TopologyBuilder builder = new TopologyBuilder();
+ UserDataSpout spout = new UserDataSpout();
+ builder.setSpout(SPOUT_ID, spout, 1);
+ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+ EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
+ builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
+
+ EsTestUtil.startEsNode();
+ EsTestUtil.waitForSeconds(5);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+ EsTestUtil.waitForSeconds(20);
+ cluster.killTopology(TOPOLOGY_NAME);
+ System.out.println("cluster begin to shutdown");
+ cluster.shutdown();
+ System.out.println("cluster shutdown");
+ System.exit(0);
+ }
+
+ public static class UserDataSpout extends BaseRichSpout {
+ private ConcurrentHashMap<UUID, Values> pending;
+ private SpoutOutputCollector collector;
+ private String[] sources = {
+ "{\"user\":\"user1\"}",
+ "{\"user\":\"user2\"}",
+ "{\"user\":\"user3\"}",
+ "{\"user\":\"user4\"}"
+ };
+ private int index = 0;
+ private int count = 0;
+ private long total = 0L;
+ private String indexName = "index1";
+ private String typeName = "type1";
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("source", "index", "type", "id"));
+ }
+
+ public void open(Map config, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.pending = new ConcurrentHashMap<UUID, Values>();
+ }
+
+ public void nextTuple() {
+ String source = sources[index];
+ UUID msgId = UUID.randomUUID();
+ Values values = new Values(source, indexName, typeName, msgId);
+ this.pending.put(msgId, values);
+ this.collector.emit(values, msgId);
+ index++;
+ if (index >= sources.length) {
+ index = 0;
+ }
+ count++;
+ total++;
+ if (count > 1000) {
+ count = 0;
+ System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+ }
+ Thread.yield();
+ }
+
+ public void ack(Object msgId) {
+ this.pending.remove(msgId);
+ }
+
+ public void fail(Object msgId) {
+ System.out.println("**** RESENDING FAILED TUPLE");
+ this.collector.emit(this.pending.get(msgId), msgId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsConstants.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsConstants.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsConstants.java
new file mode 100644
index 0000000..98bb71d
--- /dev/null
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsConstants.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+public class EsConstants {
+ public static String clusterName = "test-cluster";
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
new file mode 100644
index 0000000..cb1c745
--- /dev/null
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+
+import java.util.HashMap;
+
+public class EsTestUtil {
+ public static Tuple generateTestTuple(String source, String index, String type, String id) {
+ TopologyBuilder builder = new TopologyBuilder();
+ GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+ new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+ @Override
+ public Fields getComponentOutputFields(String componentId, String streamId) {
+ return new Fields("source", "index", "type", "id");
+ }
+ };
+ return new TupleImpl(topologyContext, new Values(source, index, type, id), 1, "");
+ }
+
+ public static EsTupleMapper generateDefaultTupleMapper() {
+ return new DefaultEsTupleMapper();
+ }
+
+ public static Node startEsNode(){
+ Node node = NodeBuilder.nodeBuilder().data(true).settings(
+ ImmutableSettings.builder()
+ .put(ClusterName.SETTING, EsConstants.clusterName)
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+ .put(EsExecutors.PROCESSORS, 1)
+ .put("http.enabled", false)
+ .put("index.percolator.map_unmapped_fields_as_string", true)
+ .put("index.store.type", "memory")
+ ).build();
+ node.start();
+ return node;
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
new file mode 100644
index 0000000..67eab5b
--- /dev/null
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsConstants;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.*;
+
+public class TridentEsTopology {
+
+ static final String TOPOLOGY_NAME = "elasticsearch-test-topology2";
+
+ public static void main(String[] args) {
+ int batchSize = 100;
+ FixedBatchSpout spout = new FixedBatchSpout(batchSize);
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout", spout);
+ EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
+ Fields esFields = new Fields("index", "type", "source");
+ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+ StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
+ TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+
+ EsTestUtil.startEsNode();
+ EsTestUtil.waitForSeconds(5);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(TOPOLOGY_NAME, null, topology.build());
+ EsTestUtil.waitForSeconds(20);
+ cluster.killTopology(TOPOLOGY_NAME);
+ System.out.println("cluster begin to shutdown");
+ cluster.shutdown();
+ System.out.println("cluster shutdown");
+ System.exit(0);
+ }
+
+ public static class FixedBatchSpout implements IBatchSpout {
+ int maxBatchSize;
+ HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+ private Values[] outputs = {
+ new Values("{\"user\":\"user1\"}", "index1", "type1", UUID.randomUUID().toString()),
+ new Values("{\"user\":\"user2\"}", "index1", "type2", UUID.randomUUID().toString()),
+ new Values("{\"user\":\"user3\"}", "index2", "type1", UUID.randomUUID().toString()),
+ new Values("{\"user\":\"user4\"}", "index2", "type2", UUID.randomUUID().toString())
+ };
+ private int index = 0;
+ boolean cycle = false;
+
+ public FixedBatchSpout(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+
+ public void setCycle(boolean cycle) {
+ this.cycle = cycle;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields("source", "index", "type", "id");
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ index = 0;
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ List<List<Object>> batch = this.batches.get(batchId);
+ if (batch == null) {
+ batch = new ArrayList<List<Object>>();
+ if (index >= outputs.length && cycle) {
+ index = 0;
+ }
+ for (int i = 0; i < maxBatchSize; index++, i++) {
+ if (index == outputs.length) {
+ index = 0;
+ }
+ batch.add(outputs[index]);
+ }
+ this.batches.put(batchId, batch);
+ }
+ for (List<Object> list : batch) {
+ collector.emit(list);
+ }
+ }
+
+ @Override
+ public void ack(long batchId) {
+ this.batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(1);
+ return conf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/pom.xml b/examples/storm-hbase-examples/pom.xml
new file mode 100644
index 0000000..ac5faaf
--- /dev/null
+++ b/examples/storm-hbase-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-hbase-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hbase</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java
new file mode 100644
index 0000000..43f72ae
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.hbase.bolt.HBaseLookupBolt;
+import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
+import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class LookupWordCount {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+ private static final String TOTAL_COUNT_BOLT = "TOTAL_COUNT_BOLT";
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+
+ Map<String, Object> hbConf = new HashMap<String, Object>();
+ if(args.length > 0){
+ hbConf.put("hbase.rootdir", args[0]);
+ }
+ config.put("hbase.conf", hbConf);
+
+ WordSpout spout = new WordSpout();
+ TotalWordCounter totalBolt = new TotalWordCounter();
+
+ SimpleHBaseMapper mapper = new SimpleHBaseMapper().withRowKeyField("word");
+ HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria();
+ projectionCriteria.addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"));
+
+ WordCountValueMapper rowToTupleMapper = new WordCountValueMapper();
+
+ HBaseLookupBolt hBaseLookupBolt = new HBaseLookupBolt("WordCount", mapper, rowToTupleMapper)
+ .withConfigKey("hbase.conf")
+ .withProjectionCriteria(projectionCriteria);
+
+ //wordspout -> lookupbolt -> totalCountBolt
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(LOOKUP_BOLT, hBaseLookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 1).fieldsGrouping(LOOKUP_BOLT, new Fields("columnName"));
+
+ if (args.length == 1) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, builder.createTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else if (args.length == 2) {
+ StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+ } else{
+ System.out.println("Usage: LookupWordCount <hbase.rootdir>");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java
new file mode 100644
index 0000000..cfb94d0
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.hbase.bolt.HBaseBolt;
+import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
+import org.apache.storm.hbase.security.HBaseSecurityUtil;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class PersistentWordCount {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String COUNT_BOLT = "COUNT_BOLT";
+ private static final String HBASE_BOLT = "HBASE_BOLT";
+
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+
+ Map<String, Object> hbConf = new HashMap<String, Object>();
+ if(args.length > 0){
+ hbConf.put("hbase.rootdir", args[0]);
+ }
+ config.put("hbase.conf", hbConf);
+
+ WordSpout spout = new WordSpout();
+ WordCounter bolt = new WordCounter();
+
+ SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+ .withRowKeyField("word")
+ .withColumnFields(new Fields("word"))
+ .withCounterFields(new Fields("count"))
+ .withColumnFamily("cf");
+
+ HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
+ .withConfigKey("hbase.conf");
+
+
+ // wordSpout ==> countBolt ==> HBaseBolt
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+ if (args.length == 1) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, builder.createTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else if (args.length == 2) {
+ StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+ } else if (args.length == 4) {
+ System.out.println("hdfs url: " + args[0] + ", keytab file: " + args[2] +
+ ", principal name: " + args[3] + ", toplogy name: " + args[1]);
+ hbConf.put(HBaseSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);
+ hbConf.put(HBaseSecurityUtil.STORM_USER_NAME_KEY, args[3]);
+ config.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+ } else {
+ System.out.println("Usage: PersistentWordCount <hbase.rootdir> [topology name] [keytab file] [principal name]");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java
new file mode 100644
index 0000000..61b0dd8
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class TotalWordCounter implements IBasicBolt {
+
+ private BigInteger total = BigInteger.ZERO;
+ private static final Logger LOG = LoggerFactory.getLogger(TotalWordCounter.class);
+ private static final Random RANDOM = new Random();
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map stormConf, TopologyContext context) {
+ }
+
+ /*
+ * Just output the word value with a count of 1.
+ * The HBaseBolt will handle incrementing the counter.
+ */
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ total = total.add(new BigInteger(input.getValues().get(1).toString()));
+ collector.emit(tuple(total.toString()));
+ //prints the total with low probability.
+ if(RANDOM.nextInt(1000) > 995) {
+ LOG.info("Running total = " + total);
+ }
+ }
+
+ public void cleanup() {
+ LOG.info("Final total = " + total);
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("total"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
new file mode 100644
index 0000000..33ce450
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Connects to the 'WordCount' table and prints counts for each word.
+ *
+ * Assumes you have run (or are running) <code>PersistentWordCount</code>
+ */
+public class WordCountClient {
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = HBaseConfiguration.create();
+ if(args.length > 0){
+ config.set("hbase.rootdir", args[0]);
+ }
+
+ HTable table = new HTable(config, "WordCount");
+
+
+ for (String word : WordSpout.words) {
+ Get get = new Get(Bytes.toBytes(word));
+ Result result = table.get(get);
+
+ byte[] countBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("count"));
+ byte[] wordBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("word"));
+
+ String wordStr = Bytes.toString(wordBytes);
+ System.out.println(wordStr);
+ long count = Bytes.toLong(countBytes);
+ System.out.println("Word: '" + wordStr + "', Count: " + count);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountValueMapper.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
new file mode 100644
index 0000000..6c3301b
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Takes a Hbase result and returns a value list that has a value instance for each column and corresponding value.
+ * So if the result from Hbase was
+ * <pre>
+ * WORD, COUNT
+ * apple, 10
+ * bannana, 20
+ * </pre>
+ *
+ * this will return
+ * <pre>
+ * [WORD, apple]
+ * [COUNT, 10]
+ * [WORD, banana]
+ * [COUNT, 20]
+ * </pre>
+ *
+ */
+public class WordCountValueMapper implements HBaseValueMapper {
+
+ @Override
+ public List<Values> toValues(ITuple tuple, Result result) throws Exception {
+ List<Values> values = new ArrayList<Values>();
+ Cell[] cells = result.rawCells();
+ for(Cell cell : cells) {
+ Values value = new Values (Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toLong(CellUtil.cloneValue(cell)));
+ values.add(value);
+ }
+ return values;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("columnName","columnValue"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java
new file mode 100644
index 0000000..3a350a8
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class WordCounter implements IBasicBolt {
+
+
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map stormConf, TopologyContext context) {
+ }
+
+ /*
+ * Just output the word value with a count of 1.
+ * The HBaseBolt will handle incrementing the counter.
+ */
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ collector.emit(tuple(input.getValues().get(0), 1));
+ }
+
+ public void cleanup() {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java
new file mode 100644
index 0000000..c5fc490
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.topology;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+public class WordSpout implements IRichSpout {
+ boolean isDistributed;
+ SpoutOutputCollector collector;
+ public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" };
+
+ public WordSpout() {
+ this(true);
+ }
+
+ public WordSpout(boolean isDistributed) {
+ this.isDistributed = isDistributed;
+ }
+
+ public boolean isDistributed() {
+ return this.isDistributed;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ public void close() {
+
+ }
+
+ public void nextTuple() {
+ final Random rand = new Random();
+ final String word = words[rand.nextInt(words.length)];
+ this.collector.emit(new Values(word), UUID.randomUUID());
+ Thread.yield();
+ }
+
+ public void ack(Object msgId) {
+
+ }
+
+ public void fail(Object msgId) {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public void activate() {
+ }
+
+ @Override
+ public void deactivate() {
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/PrintFunction.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/PrintFunction.java
new file mode 100644
index 0000000..cdc7690
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/PrintFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.Random;
+
+public class PrintFunction extends BaseFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class);
+
+ private static final Random RANDOM = new Random();
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
+ if(RANDOM.nextInt(1000) > 995) {
+ LOG.info(tuple.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java
new file mode 100644
index 0000000..b2f0ce8
--- /dev/null
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
+import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper;
+import org.apache.storm.hbase.topology.WordCountValueMapper;
+import org.apache.storm.hbase.trident.mapper.SimpleTridentHBaseMapper;
+import org.apache.storm.hbase.trident.mapper.TridentHBaseMapper;
+import org.apache.storm.hbase.trident.state.HBaseQuery;
+import org.apache.storm.hbase.trident.state.HBaseState;
+import org.apache.storm.hbase.trident.state.HBaseStateFactory;
+import org.apache.storm.hbase.trident.state.HBaseUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+public class WordCountTrident {
+ public static StormTopology buildTopology(String hbaseRoot){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()
+ .withColumnFamily("cf")
+ .withColumnFields(new Fields("word"))
+ .withCounterFields(new Fields("count"))
+ .withRowKeyField("word");
+
+ HBaseValueMapper rowToStormValueMapper = new WordCountValueMapper();
+
+ HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria();
+ projectionCriteria.addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"));
+
+ HBaseState.Options options = new HBaseState.Options()
+ .withConfigKey(hbaseRoot)
+ .withDurability(Durability.SYNC_WAL)
+ .withMapper(tridentHBaseMapper)
+ .withProjectionCriteria(projectionCriteria)
+ .withRowToStormValueMapper(rowToStormValueMapper)
+ .withTableName("WordCount");
+
+ StateFactory factory = new HBaseStateFactory(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory, fields, new HBaseUpdater(), new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"), new HBaseQuery(), new Fields("columnName","columnValue"));
+ stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (args.length == 1) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", conf, buildTopology(args[0]));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("wordCounter");
+ cluster.shutdown();
+ System.exit(0);
+ }
+ else if(args.length == 2) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[1], conf, buildTopology(args[0]));
+ } else{
+ System.out.println("Usage: TridentFileTopology <hdfs url> [topology name]");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hdfs-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/pom.xml b/examples/storm-hdfs-examples/pom.xml
new file mode 100644
index 0000000..0214ce1
--- /dev/null
+++ b/examples/storm-hdfs-examples/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-hdfs-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hdfs</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
new file mode 100644
index 0000000..b1ae542
--- /dev/null
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
+import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.MoveFileAction;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HdfsFileTopology {
+ static final String SENTENCE_SPOUT_ID = "sentence-spout";
+ static final String BOLT_ID = "my-bolt";
+ static final String TOPOLOGY_NAME = "test-topology";
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+ config.setNumWorkers(1);
+
+ SentenceSpout spout = new SentenceSpout();
+
+ // sync the filesystem after every 1k tuples
+ SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+ // rotate files when they reach 5MB
+ FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES);
+
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+ .withPath("/tmp/foo/")
+ .withExtension(".txt");
+
+ // use "|" instead of "," for field delimiter
+ RecordFormat format = new DelimitedRecordFormat()
+ .withFieldDelimiter("|");
+
+ Yaml yaml = new Yaml();
+ InputStream in = new FileInputStream(args[1]);
+ Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
+ in.close();
+ config.put("hdfs.config", yamlConf);
+
+ HdfsBolt bolt = new HdfsBolt()
+ .withConfigKey("hdfs.config")
+ .withFsUrl(args[0])
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(format)
+ .withRotationPolicy(rotationPolicy)
+ .withSyncPolicy(syncPolicy)
+ .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/"));
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
+ // SentenceSpout --> MyBolt
+ builder.setBolt(BOLT_ID, bolt, 4)
+ .shuffleGrouping(SENTENCE_SPOUT_ID);
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+
+ cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+ waitForSeconds(120);
+ cluster.killTopology(TOPOLOGY_NAME);
+ cluster.shutdown();
+ System.exit(0);
+ } else if (args.length == 3) {
+ StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+ } else{
+ System.out.println("Usage: HdfsFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
+ }
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public static class SentenceSpout extends BaseRichSpout {
+ private ConcurrentHashMap<UUID, Values> pending;
+ private SpoutOutputCollector collector;
+ private String[] sentences = {
+ "my dog has fleas",
+ "i like cold beverages",
+ "the dog ate my homework",
+ "don't have a cow man",
+ "i don't think i like fleas"
+ };
+ private int index = 0;
+ private int count = 0;
+ private long total = 0L;
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sentence", "timestamp"));
+ }
+
+ public void open(Map config, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.pending = new ConcurrentHashMap<UUID, Values>();
+ }
+
+ public void nextTuple() {
+ Values values = new Values(sentences[index], System.currentTimeMillis());
+ UUID msgId = UUID.randomUUID();
+ this.pending.put(msgId, values);
+ this.collector.emit(values, msgId);
+ index++;
+ if (index >= sentences.length) {
+ index = 0;
+ }
+ count++;
+ total++;
+ if(count > 20000){
+ count = 0;
+ System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+ }
+ Thread.yield();
+ }
+
+ public void ack(Object msgId) {
+ this.pending.remove(msgId);
+ }
+
+ public void fail(Object msgId) {
+ System.out.println("**** RESENDING FAILED TUPLE");
+ this.collector.emit(this.pending.get(msgId), msgId);
+ }
+ }
+
+ public static class MyBolt extends BaseRichBolt {
+
+ private HashMap<String, Long> counts = null;
+ private OutputCollector collector;
+
+ public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+ this.counts = new HashMap<String, Long>();
+ this.collector = collector;
+ }
+
+ public void execute(Tuple tuple) {
+ collector.ack(tuple);
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // this bolt does not emit anything
+ }
+
+ @Override
+ public void cleanup() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
new file mode 100644
index 0000000..86bc698
--- /dev/null
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.hdfs.bolt.format.*;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.MoveFileAction;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SequenceFileTopology {
+ static final String SENTENCE_SPOUT_ID = "sentence-spout";
+ static final String BOLT_ID = "my-bolt";
+ static final String TOPOLOGY_NAME = "test-topology";
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+ config.setNumWorkers(1);
+
+ SentenceSpout spout = new SentenceSpout();
+
+ // sync the filesystem after every 1k tuples
+ SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+ // rotate files when they reach 5MB
+ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+ .withPath("/tmp/source/")
+ .withExtension(".seq");
+
+ // create sequence format instance.
+ DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence");
+
+ Yaml yaml = new Yaml();
+ InputStream in = new FileInputStream(args[1]);
+ Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
+ in.close();
+ config.put("hdfs.config", yamlConf);
+
+ SequenceFileBolt bolt = new SequenceFileBolt()
+ .withFsUrl(args[0])
+ .withConfigKey("hdfs.config")
+ .withFileNameFormat(fileNameFormat)
+ .withSequenceFormat(format)
+ .withRotationPolicy(rotationPolicy)
+ .withSyncPolicy(syncPolicy)
+ .withCompressionType(SequenceFile.CompressionType.RECORD)
+ .withCompressionCodec("deflate")
+ .addRotationAction(new MoveFileAction().toDestination("/tmp/dest/"));
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
+ // SentenceSpout --> MyBolt
+ builder.setBolt(BOLT_ID, bolt, 4)
+ .shuffleGrouping(SENTENCE_SPOUT_ID);
+
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+
+ cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+ waitForSeconds(120);
+ cluster.killTopology(TOPOLOGY_NAME);
+ cluster.shutdown();
+ System.exit(0);
+ } else if(args.length == 3) {
+ StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+ } else{
+ System.out.println("Usage: SequenceFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
+ }
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+
+ public static class SentenceSpout extends BaseRichSpout {
+
+
+ private ConcurrentHashMap<UUID, Values> pending;
+ private SpoutOutputCollector collector;
+ private String[] sentences = {
+ "my dog has fleas",
+ "i like cold beverages",
+ "the dog ate my homework",
+ "don't have a cow man",
+ "i don't think i like fleas"
+ };
+ private int index = 0;
+ private int count = 0;
+ private long total = 0L;
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sentence", "timestamp"));
+ }
+
+ public void open(Map config, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.pending = new ConcurrentHashMap<UUID, Values>();
+ }
+
+ public void nextTuple() {
+ Values values = new Values(sentences[index], System.currentTimeMillis());
+ UUID msgId = UUID.randomUUID();
+ this.pending.put(msgId, values);
+ this.collector.emit(values, msgId);
+ index++;
+ if (index >= sentences.length) {
+ index = 0;
+ }
+ count++;
+ total++;
+ if(count > 20000){
+ count = 0;
+ System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+ }
+ Thread.yield();
+ }
+
+ public void ack(Object msgId) {
+// System.out.println("ACK");
+ this.pending.remove(msgId);
+ }
+
+ public void fail(Object msgId) {
+ System.out.println("**** RESENDING FAILED TUPLE");
+ this.collector.emit(this.pending.get(msgId), msgId);
+ }
+ }
+
+
+ public static class MyBolt extends BaseRichBolt {
+
+ private HashMap<String, Long> counts = null;
+ private OutputCollector collector;
+
+ public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+ this.counts = new HashMap<String, Long>();
+ this.collector = collector;
+ }
+
+ public void execute(Tuple tuple) {
+ collector.ack(tuple);
+ }
+
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // this bolt does not emit anything
+ }
+
+ @Override
+ public void cleanup() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
new file mode 100644
index 0000000..76cc2aa
--- /dev/null
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/FixedBatchSpout.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FixedBatchSpout implements IBatchSpout {
+
+ Fields fields;
+ List<Object>[] outputs;
+ int maxBatchSize;
+ HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+
+ public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
+ this.fields = fields;
+ this.outputs = outputs;
+ this.maxBatchSize = maxBatchSize;
+ }
+
+ int index = 0;
+ boolean cycle = false;
+
+ public void setCycle(boolean cycle) {
+ this.cycle = cycle;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ index = 0;
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ List<List<Object>> batch = this.batches.get(batchId);
+ if(batch == null){
+ batch = new ArrayList<List<Object>>();
+ if(index>=outputs.length && cycle) {
+ index = 0;
+ }
+ for(int i=0; i < maxBatchSize; index++, i++) {
+ if(index == outputs.length){
+ index=0;
+ }
+ batch.add(outputs[index]);
+ }
+ this.batches.put(batchId, batch);
+ }
+ for(List<Object> list : batch){
+ collector.emit(list);
+ }
+ }
+
+ @Override
+ public void ack(long batchId) {
+ this.batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(1);
+ return conf;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return fields;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/97fe209e/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
new file mode 100644
index 0000000..8f75c45
--- /dev/null
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.hdfs.common.rotation.MoveFileAction;
+import org.apache.storm.hdfs.trident.format.*;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Map;
+
+public class TridentFileTopology {
+
+ public static StormTopology buildTopology(String hdfsUrl){
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", "key"), 1000, new Values("the cow jumped over the moon", 1l),
+ new Values("the man went to the store and bought some candy", 2l), new Values("four score and seven years ago", 3l),
+ new Values("how many apples can you eat", 4l), new Values("to be or not to be the person", 5l));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ Fields hdfsFields = new Fields("sentence", "key");
+
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+ .withPath("/tmp/trident")
+ .withPrefix("trident")
+ .withExtension(".txt");
+
+ RecordFormat recordFormat = new DelimitedRecordFormat()
+ .withFields(hdfsFields);
+
+ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
+
+ HdfsState.Options options = new HdfsState.HdfsFileOptions()
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(recordFormat)
+ .withRotationPolicy(rotationPolicy)
+ .withFsUrl(hdfsUrl)
+ .withConfigKey("hdfs.config");
+
+ StateFactory factory = new HdfsStateFactory().withOptions(options);
+
+ TridentState state = stream
+ .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
+
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+
+ Yaml yaml = new Yaml();
+ InputStream in = new FileInputStream(args[1]);
+ Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
+ in.close();
+ conf.put("hdfs.config", yamlConf);
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", conf, buildTopology(args[0]));
+ Thread.sleep(120 * 1000);
+ } else if(args.length == 3) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0]));
+ } else{
+ System.out.println("Usage: TridentFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
+ }
+ }
+}