You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/04 04:05:52 UTC
[10/50] [abbrv] storm git commit: add HBase example
add HBase example
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4a1db96f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4a1db96f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4a1db96f
Branch: refs/heads/0.10.x-branch
Commit: 4a1db96fc38fb438c3d4433381e719ca16d63bc8
Parents: a791604
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Apr 7 23:23:05 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Apr 7 23:23:05 2015 -0400
----------------------------------------------------------------------
flux-core/pom.xml | 6 +
.../java/org/apache/storm/flux/TCKTest.java | 10 ++
.../test/resources/configs/simple_hbase.yaml | 120 +++++++++++++++++++
flux-examples/pom.xml | 5 +
.../storm/flux/examples/WordCountClient.java | 63 ++++++++++
.../apache/storm/flux/examples/WordCounter.java | 71 +++++++++++
flux-examples/src/main/resources/hbase-site.xml | 36 ++++++
.../src/main/resources/simple_hbase.yaml | 91 ++++++++++++++
8 files changed, 402 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-core/pom.xml
----------------------------------------------------------------------
diff --git a/flux-core/pom.xml b/flux-core/pom.xml
index 0d72ead..fe2e301 100644
--- a/flux-core/pom.xml
+++ b/flux-core/pom.xml
@@ -50,6 +50,12 @@
<version>${storm.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hbase</artifactId>
+ <version>0.11.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<resources>
http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
----------------------------------------------------------------------
diff --git a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index 6580ef7..27abfbe 100644
--- a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -82,6 +82,16 @@ public class TCKTest {
}
@Test
+ public void testHbase() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/simple_hbase.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test
public void testIncludes() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-core/src/test/resources/configs/simple_hbase.yaml
----------------------------------------------------------------------
diff --git a/flux-core/src/test/resources/configs/simple_hbase.yaml b/flux-core/src/test/resources/configs/simple_hbase.yaml
new file mode 100644
index 0000000..e407bd9
--- /dev/null
+++ b/flux-core/src/test/resources/configs/simple_hbase.yaml
@@ -0,0 +1,120 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Test ability to wire together shell spouts/bolts
+---
+
+# topology definition
+# name to be used when submitting
+name: "hbase-wordcount"
+
+# Components
+# Components are analagous to Spring beans. They are meant to be used as constructor,
+# property(setter), and builder arguments.
+#
+# for the time being, components must be declared in the order they are referenced
+
+# WordSpout spout = new WordSpout();
+# WordCounter bolt = new WordCounter();
+#
+# SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+# .withRowKeyField("word")
+# .withColumnFields(new Fields("word"))
+# .withCounterFields(new Fields("count"))
+# .withColumnFamily("cf");
+#
+# HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
+# .withConfigKey("hbase.conf");
+#
+#
+# // wordSpout ==> countBolt ==> HBaseBolt
+# TopologyBuilder builder = new TopologyBuilder();
+#
+# builder.setSpout(WORD_SPOUT, spout, 1);
+# builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+# builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+
+
+components:
+ - id: "columnFields"
+ className: "backtype.storm.tuple.Fields"
+ constructorArgs:
+ - ["word"]
+
+ - id: "counterFields"
+ className: "backtype.storm.tuple.Fields"
+ constructorArgs:
+ - ["count"]
+
+ - id: "mapper"
+ className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper"
+ configMethods:
+ - name: "withRowKeyField"
+ args: ["word"]
+ - name: "withColumnFields"
+ args: [ref: "columnFields"]
+ - name: "withCounterFields"
+ args: [ref: "counterFields"]
+ - name: "withColumnFamily"
+ args: ["cf"]
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+ topology.workers: 1
+ hbase.conf:
+ hbase.rootdir: "hdfs://hadoop:54310/hbase"
+ hbase.zookeeper.quorum: "hadoop"
+
+# spout definitions
+spouts:
+ - id: "word-spout"
+ className: "backtype.storm.testing.TestWordSpout"
+ parallelism: 1
+
+# bolt definitions
+
+bolts:
+ - id: "count-bolt"
+ className: "backtype.storm.testing.TestWordCounter"
+
+ - id: "hbase-bolt"
+ className: "org.apache.storm.hbase.bolt.HBaseBolt"
+ constructorArgs:
+ - "WordCount" # HBase table name
+ - ref: "mapper"
+ configMethods:
+ - name: "withConfigKey"
+ args: ["hbase.conf"]
+ parallelism: 1
+
+
+streams:
+ - name: "" # name isn't used (placeholder for logging, UI, etc.)
+ from: "word-spout"
+ to: "count-bolt"
+ grouping:
+ type: SHUFFLE
+
+ - name: "" # name isn't used (placeholder for logging, UI, etc.)
+ from: "count-bolt"
+ to: "hbase-bolt"
+ grouping:
+ type: FIELDS
+ args: ["word"]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flux-examples/pom.xml b/flux-examples/pom.xml
index 09db717..2321074 100644
--- a/flux-examples/pom.xml
+++ b/flux-examples/pom.xml
@@ -50,6 +50,11 @@
<artifactId>storm-hdfs</artifactId>
<version>${storm.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hbase</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
----------------------------------------------------------------------
diff --git a/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
new file mode 100644
index 0000000..55873d5
--- /dev/null
+++ b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.examples;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Connects to the 'WordCount' HBase table and prints counts for each word.
+ *
+ * Assumes you have run (or are running) the YAML topology definition in
+ * <code>simple_hbase.yaml</code>
+ *
+ * You will also need to modify `src/main/resources/hbase-site.xml`
+ * to point to your HBase instance, and then repackage with `mvn package`.
+ * This is a known issue.
+ *
+ */
+public class WordCountClient {
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = HBaseConfiguration.create();
+ if(args.length > 0){
+ config.set("hbase.rootdir", args[0]);
+ }
+
+ HTable table = new HTable(config, "WordCount");
+ String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
+
+ for (String word : words) {
+ Get get = new Get(Bytes.toBytes(word));
+ Result result = table.get(get);
+
+ byte[] countBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("count"));
+ byte[] wordBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("word"));
+
+ String wordStr = Bytes.toString(wordBytes);
+ System.out.println(wordStr);
+ long count = Bytes.toLong(countBytes);
+ System.out.println("Word: '" + wordStr + "', Count: " + count);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
----------------------------------------------------------------------
diff --git a/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
new file mode 100644
index 0000000..f7c80c7
--- /dev/null
+++ b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.examples;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static backtype.storm.utils.Utils.tuple;
+
+/**
+ * This bolt is used by the HBase example. It simply emits the first field
+ * found in the incoming tuple as "word", with a "count" of `1`.
+ *
+ * In this case, the downstream HBase bolt handles the counting, so a value
+ * of `1` will just increment the HBase counter by one.
+ */
+public class WordCounter extends BaseBasicBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(WordCounter.class);
+
+
+
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map stormConf, TopologyContext context) {
+ }
+
+ /*
+ * Just output the word value with a count of 1.
+ * The HBaseBolt will handle incrementing the counter.
+ */
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ collector.emit(tuple(input.getValues().get(0), 1));
+ }
+
+ public void cleanup() {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/flux-examples/src/main/resources/hbase-site.xml b/flux-examples/src/main/resources/hbase-site.xml
new file mode 100644
index 0000000..06c3031
--- /dev/null
+++ b/flux-examples/src/main/resources/hbase-site.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>hbase.cluster.distributed</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hbase.rootdir</name>
+ <value>hdfs://hadoop:54310/hbase</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.quorum</name>
+ <value>hadoop</value>
+ </property>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/resources/simple_hbase.yaml
----------------------------------------------------------------------
diff --git a/flux-examples/src/main/resources/simple_hbase.yaml b/flux-examples/src/main/resources/simple_hbase.yaml
new file mode 100644
index 0000000..5eb70ed
--- /dev/null
+++ b/flux-examples/src/main/resources/simple_hbase.yaml
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+---
+# NOTE: To use this example, you will need to modify `src/main/resources/hbase-site.xml`
+# to point to your HBase instance, and then repackage with `mvn package`.
+# This is a known issue.
+
+# topology definition
+# name to be used when submitting
+name: "hbase-persistent-wordcount"
+
+# Components
+components:
+ - id: "columnFields"
+ className: "backtype.storm.tuple.Fields"
+ constructorArgs:
+ - ["word"]
+
+ - id: "counterFields"
+ className: "backtype.storm.tuple.Fields"
+ constructorArgs:
+ - ["count"]
+
+ - id: "mapper"
+ className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper"
+ configMethods:
+ - name: "withRowKeyField"
+ args: ["word"]
+ - name: "withColumnFields"
+ args: [ref: "columnFields"]
+ - name: "withCounterFields"
+ args: [ref: "counterFields"]
+ - name: "withColumnFamily"
+ args: ["cf"]
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+config:
+ topology.workers: 1
+ hbase.conf:
+ hbase.rootdir: "hdfs://hadoop:54310/hbase"
+
+# spout definitions
+spouts:
+ - id: "word-spout"
+ className: "backtype.storm.testing.TestWordSpout"
+ parallelism: 1
+
+# bolt definitions
+
+bolts:
+ - id: "count-bolt"
+ className: "org.apache.storm.flux.examples.WordCounter"
+ parallelism: 1
+
+ - id: "hbase-bolt"
+ className: "org.apache.storm.hbase.bolt.HBaseBolt"
+ constructorArgs:
+ - "WordCount" # HBase table name
+ - ref: "mapper"
+ configMethods:
+ - name: "withConfigKey"
+ args: ["hbase.conf"]
+ parallelism: 1
+
+streams:
+ - name: "" # name isn't used (placeholder for logging, UI, etc.)
+ from: "word-spout"
+ to: "count-bolt"
+ grouping:
+ type: SHUFFLE
+
+ - name: "" # name isn't used (placeholder for logging, UI, etc.)
+ from: "count-bolt"
+ to: "hbase-bolt"
+ grouping:
+ type: FIELDS
+ args: ["word"]