You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/04 04:05:52 UTC

[10/50] [abbrv] storm git commit: add HBase example

add HBase example


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4a1db96f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4a1db96f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4a1db96f

Branch: refs/heads/0.10.x-branch
Commit: 4a1db96fc38fb438c3d4433381e719ca16d63bc8
Parents: a791604
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Apr 7 23:23:05 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Apr 7 23:23:05 2015 -0400

----------------------------------------------------------------------
 flux-core/pom.xml                               |   6 +
 .../java/org/apache/storm/flux/TCKTest.java     |  10 ++
 .../test/resources/configs/simple_hbase.yaml    | 120 +++++++++++++++++++
 flux-examples/pom.xml                           |   5 +
 .../storm/flux/examples/WordCountClient.java    |  63 ++++++++++
 .../apache/storm/flux/examples/WordCounter.java |  71 +++++++++++
 flux-examples/src/main/resources/hbase-site.xml |  36 ++++++
 .../src/main/resources/simple_hbase.yaml        |  91 ++++++++++++++
 8 files changed, 402 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-core/pom.xml
----------------------------------------------------------------------
diff --git a/flux-core/pom.xml b/flux-core/pom.xml
index 0d72ead..fe2e301 100644
--- a/flux-core/pom.xml
+++ b/flux-core/pom.xml
@@ -50,6 +50,12 @@
             <version>${storm.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hbase</artifactId>
+            <version>0.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <resources>

http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
----------------------------------------------------------------------
diff --git a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index 6580ef7..27abfbe 100644
--- a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -82,6 +82,16 @@ public class TCKTest {
     }
 
     @Test
+    public void testHbase() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/simple_hbase.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test
     public void testIncludes() throws Exception {
         TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false);
         Config conf = FluxBuilder.buildConfig(topologyDef);

http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-core/src/test/resources/configs/simple_hbase.yaml
----------------------------------------------------------------------
diff --git a/flux-core/src/test/resources/configs/simple_hbase.yaml b/flux-core/src/test/resources/configs/simple_hbase.yaml
new file mode 100644
index 0000000..e407bd9
--- /dev/null
+++ b/flux-core/src/test/resources/configs/simple_hbase.yaml
@@ -0,0 +1,120 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Test ability to wire together shell spouts/bolts
+---
+
+# topology definition
+# name to be used when submitting
+name: "hbase-wordcount"
+
+# Components
+# Components are analagous to Spring beans. They are meant to be used as constructor,
+# property(setter), and builder arguments.
+#
+# for the time being, components must be declared in the order they are referenced
+
+#        WordSpout spout = new WordSpout();
+#        WordCounter bolt = new WordCounter();
+#
+#        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+#                .withRowKeyField("word")
+#                .withColumnFields(new Fields("word"))
+#                .withCounterFields(new Fields("count"))
+#                .withColumnFamily("cf");
+#
+#        HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
+#                .withConfigKey("hbase.conf");
+#
+#
+#        // wordSpout ==> countBolt ==> HBaseBolt
+#        TopologyBuilder builder = new TopologyBuilder();
+#
+#        builder.setSpout(WORD_SPOUT, spout, 1);
+#        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+#        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+
+
+components:
+  - id: "columnFields"
+    className: "backtype.storm.tuple.Fields"
+    constructorArgs:
+      - ["word"]
+
+  - id: "counterFields"
+    className: "backtype.storm.tuple.Fields"
+    constructorArgs:
+      - ["count"]
+
+  - id: "mapper"
+    className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper"
+    configMethods:
+      - name: "withRowKeyField"
+        args: ["word"]
+      - name: "withColumnFields"
+        args: [ref: "columnFields"]
+      - name: "withCounterFields"
+        args: [ref: "counterFields"]
+      - name: "withColumnFamily"
+        args: ["cf"]
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+  topology.workers: 1
+  hbase.conf:
+    hbase.rootdir: "hdfs://hadoop:54310/hbase"
+    hbase.zookeeper.quorum: "hadoop"
+
+# spout definitions
+spouts:
+  - id: "word-spout"
+    className: "backtype.storm.testing.TestWordSpout"
+    parallelism: 1
+
+# bolt definitions
+
+bolts:
+  - id: "count-bolt"
+    className: "backtype.storm.testing.TestWordCounter"
+
+  - id: "hbase-bolt"
+    className: "org.apache.storm.hbase.bolt.HBaseBolt"
+    constructorArgs:
+      - "WordCount" # HBase table name
+      - ref: "mapper"
+    configMethods:
+      - name: "withConfigKey"
+        args: ["hbase.conf"]
+    parallelism: 1
+
+
+streams:
+  - name: "" # name isn't used (placeholder for logging, UI, etc.)
+    from: "word-spout"
+    to: "count-bolt"
+    grouping:
+      type: SHUFFLE
+
+  - name: "" # name isn't used (placeholder for logging, UI, etc.)
+    from: "count-bolt"
+    to: "hbase-bolt"
+    grouping:
+      type: FIELDS
+      args: ["word"]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flux-examples/pom.xml b/flux-examples/pom.xml
index 09db717..2321074 100644
--- a/flux-examples/pom.xml
+++ b/flux-examples/pom.xml
@@ -50,6 +50,11 @@
             <artifactId>storm-hdfs</artifactId>
             <version>${storm.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hbase</artifactId>
+            <version>${storm.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
----------------------------------------------------------------------
diff --git a/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
new file mode 100644
index 0000000..55873d5
--- /dev/null
+++ b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.examples;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Connects to the 'WordCount' HBase table and prints counts for each word.
+ *
+ * Assumes you have run (or are running) the YAML topology definition in
+ * <code>simple_hbase.yaml</code>
+ *
+ * You will also need to modify `src/main/resources/hbase-site.xml`
+ * to point to your HBase instance, and then repackage with `mvn package`.
+ * This is a known issue.
+ *
+ */
+public class WordCountClient {
+
+    public static void main(String[] args) throws Exception {
+        Configuration config = HBaseConfiguration.create();
+        if(args.length > 0){
+            config.set("hbase.rootdir", args[0]);
+        }
+
+        HTable table = new HTable(config, "WordCount");
+        String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
+
+        for (String word : words) {
+            Get get = new Get(Bytes.toBytes(word));
+            Result result = table.get(get);
+
+            byte[] countBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("count"));
+            byte[] wordBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("word"));
+
+            String wordStr = Bytes.toString(wordBytes);
+            System.out.println(wordStr);
+            long count = Bytes.toLong(countBytes);
+            System.out.println("Word: '" + wordStr + "', Count: " + count);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
----------------------------------------------------------------------
diff --git a/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
new file mode 100644
index 0000000..f7c80c7
--- /dev/null
+++ b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.examples;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static backtype.storm.utils.Utils.tuple;
+
+/**
+ * This bolt is used by the HBase example. It simply emits the first field
+ * found in the incoming tuple as "word", with a "count" of `1`.
+ *
+ * In this case, the downstream HBase bolt handles the counting, so a value
+ * of `1` will just increment the HBase counter by one.
+ */
+public class WordCounter extends BaseBasicBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(WordCounter.class);
+
+
+
+    @SuppressWarnings("rawtypes")
+    public void prepare(Map stormConf, TopologyContext context) {
+    }
+
+    /*
+     * Just output the word value with a count of 1.
+     * The HBaseBolt will handle incrementing the counter.
+     */
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        collector.emit(tuple(input.getValues().get(0), 1));
+    }
+
+    public void cleanup() {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "count"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/flux-examples/src/main/resources/hbase-site.xml b/flux-examples/src/main/resources/hbase-site.xml
new file mode 100644
index 0000000..06c3031
--- /dev/null
+++ b/flux-examples/src/main/resources/hbase-site.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+	<property>
+	  <name>hbase.cluster.distributed</name>
+	  <value>true</value>
+	</property>
+	<property>
+	  <name>hbase.rootdir</name>
+	  <value>hdfs://hadoop:54310/hbase</value>
+	</property>
+	<property>
+	  <name>hbase.zookeeper.quorum</name>
+	  <value>hadoop</value>
+	</property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/resources/simple_hbase.yaml
----------------------------------------------------------------------
diff --git a/flux-examples/src/main/resources/simple_hbase.yaml b/flux-examples/src/main/resources/simple_hbase.yaml
new file mode 100644
index 0000000..5eb70ed
--- /dev/null
+++ b/flux-examples/src/main/resources/simple_hbase.yaml
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+---
+# NOTE: To use this example, you will need to modify `src/main/resources/hbase-site.xml`
+# to point to your HBase instance, and then repackage with `mvn package`.
+# This is a known issue.
+
+# topology definition
+# name to be used when submitting
+name: "hbase-persistent-wordcount"
+
+# Components
+components:
+  - id: "columnFields"
+    className: "backtype.storm.tuple.Fields"
+    constructorArgs:
+      - ["word"]
+
+  - id: "counterFields"
+    className: "backtype.storm.tuple.Fields"
+    constructorArgs:
+      - ["count"]
+
+  - id: "mapper"
+    className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper"
+    configMethods:
+      - name: "withRowKeyField"
+        args: ["word"]
+      - name: "withColumnFields"
+        args: [ref: "columnFields"]
+      - name: "withCounterFields"
+        args: [ref: "counterFields"]
+      - name: "withColumnFamily"
+        args: ["cf"]
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+config:
+  topology.workers: 1
+  hbase.conf:
+    hbase.rootdir: "hdfs://hadoop:54310/hbase"
+
+# spout definitions
+spouts:
+  - id: "word-spout"
+    className: "backtype.storm.testing.TestWordSpout"
+    parallelism: 1
+
+# bolt definitions
+
+bolts:
+  - id: "count-bolt"
+    className: "org.apache.storm.flux.examples.WordCounter"
+    parallelism: 1
+
+  - id: "hbase-bolt"
+    className: "org.apache.storm.hbase.bolt.HBaseBolt"
+    constructorArgs:
+      - "WordCount" # HBase table name
+      - ref: "mapper"
+    configMethods:
+      - name: "withConfigKey"
+        args: ["hbase.conf"]
+    parallelism: 1
+
+streams:
+  - name: "" # name isn't used (placeholder for logging, UI, etc.)
+    from: "word-spout"
+    to: "count-bolt"
+    grouping:
+      type: SHUFFLE
+
+  - name: "" # name isn't used (placeholder for logging, UI, etc.)
+    from: "count-bolt"
+    to: "hbase-bolt"
+    grouping:
+      type: FIELDS
+      args: ["word"]