You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/07 05:40:00 UTC
[2/4] storm git commit: [STORM-2457] update and modify
storm/hbase/README.md and storm-hbase.md
[STORM-2457] update and modify storm/hbase/README.md and storm-hbase.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dcf6d88a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dcf6d88a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dcf6d88a
Branch: refs/heads/master
Commit: dcf6d88a4587b709840fb088bb3f46909cbedcfa
Parents: c652d3f
Author: liuzhaokun <li...@zte.com.cn>
Authored: Thu Apr 6 13:20:09 2017 +0800
Committer: \u5218\u5146\u576410206665 <li...@zte.com.cn>
Committed: Thu Apr 6 13:20:09 2017 +0800
----------------------------------------------------------------------
docs/storm-hbase.md | 56 +++++++++++++++++++++++++++++--------
external/storm-hbase/README.md | 38 +++++++++++++++++--------
2 files changed, 70 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dcf6d88a/docs/storm-hbase.md
----------------------------------------------------------------------
diff --git a/docs/storm-hbase.md b/docs/storm-hbase.md
index 7f4fb62..e873f31 100644
--- a/docs/storm-hbase.md
+++ b/docs/storm-hbase.md
@@ -111,6 +111,16 @@ To use the `HBaseBolt`, construct it with the name of the table to write to, an
```java
HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
```
+
+ HBaseBolt params
+
+|Arg |Description | Type | Default |
+|--- |--- |---
+|writeToWAL | To turn Durability SYNC_WAL or SKIP_WAL | Boolean (Optional) | True |
+|configKey | Any Hbase related configs | Map (Optional) | |
+|batchSize | Max no.of Tuples batched together to write to HBase | Int (Optional) | 15000 |
+|flushIntervalSecs| (In seconds) If > 0 HBase Bolt will periodically flush transaction batches. Enabling this is recommended to avoid tuple timeouts while waiting for a batch to fill up. | Int (Optional) | 0 |
+
The `HBaseBolt` will delegate to the `mapper` instance to figure out how to persist tuple data to HBase.
@@ -129,7 +139,7 @@ Each of the value returned by this function will be emitted by the `HBaseLookupB
The `declareOutputFields` should be used to declare the outputFields of the `HBaseLookupBolt`.
-There is an example implementation in `src/test/java` directory.
+There is an example implementation in `examples/storm-hbase-examples/src/main/java` directory.
###HBaseProjectionCriteria
This class allows you to specify the projection criteria for your HBase Get function. This is optional parameter
@@ -164,9 +174,17 @@ The `HBaseLookupBolt` will use the mapper to get rowKey to lookup for. It will u
figure out which columns to include in the result and it will leverage the `HBaseRowToStormValueMapper` to get the
values to be emitted by the bolt.
-You can look at an example topology LookupWordCount.java under `src/test/java`.
+In addition, the `HBaseLookupBolt` supports bolt-side HBase result caching using an in-memory LRU cache using Caffeine. To enable caching:
+
+`hbase.cache.enable` - to enable caching (default false)
+
+`hbase.cache.ttl.seconds` - set time to live for LRU cache in seconds (default 300)
+
+`hbase.cache.size` - set size of the cache (default 1000)
+
+You can look at an example topology LookupWordCount.java under `examples/storm-hbase-examples/src/main/java`.
## Example: Persistent Word Count
-A runnable example can be found in the `src/test/java` directory.
+A runnable example can be found in the `examples/storm-hbase-examples/src/main/java` directory.
### Setup
The following steps assume you are running HBase locally, or there is an `hbase-site.xml` on the
@@ -204,6 +222,12 @@ public class PersistentWordCount {
public static void main(String[] args) throws Exception {
Config config = new Config();
+ Map<String, Object> hbConf = new HashMap<String, Object>();
+ if(args.length > 0){
+ hbConf.put("hbase.rootdir", args[0]);
+ }
+ config.put("hbase.conf", hbConf);
+
WordSpout spout = new WordSpout();
WordCounter bolt = new WordCounter();
@@ -213,7 +237,8 @@ public class PersistentWordCount {
.withCounterFields(new Fields("count"))
.withColumnFamily("cf");
- HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
+ HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
+ .withConfigKey("hbase.conf");
// wordSpout ==> countBolt ==> HBaseBolt
@@ -224,16 +249,23 @@ public class PersistentWordCount {
builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
- if (args.length == 0) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", config, builder.createTopology());
- Thread.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
+ if (args.length == 1) {
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
+ Thread.sleep(30000);
+ }
System.exit(0);
- } else {
+ } else if (args.length == 2) {
+ StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+ } else if (args.length == 4) {
+ System.out.println("hdfs url: " + args[0] + ", keytab file: " + args[2] +
+ ", principal name: " + args[3] + ", toplogy name: " + args[1]);
+ hbConf.put(HBaseSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);
+ hbConf.put(HBaseSecurityUtil.STORM_USER_NAME_KEY, args[3]);
config.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], config, builder.createTopology());
+ StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+ } else {
+ System.out.println("Usage: PersistentWordCount <hbase.rootdir> [topology name] [keytab file] [principal name]");
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/dcf6d88a/external/storm-hbase/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hbase/README.md b/external/storm-hbase/README.md
index 9efb663..95d5c6c 100644
--- a/external/storm-hbase/README.md
+++ b/external/storm-hbase/README.md
@@ -135,7 +135,7 @@ Each of the value returned by this function will be emitted by the `HBaseLookupB
The `declareOutputFields` should be used to declare the outputFields of the `HBaseLookupBolt`.
-There is an example implementation in `src/test/java` directory.
+There is an example implementation in `examples/storm-hbase-examples/src/main/java` directory.
###HBaseProjectionCriteria
This class allows you to specify the projection criteria for your HBase Get function. This is optional parameter
@@ -178,9 +178,9 @@ In addition, the `HBaseLookupBolt` supports bolt-side HBase result caching using
`hbase.cache.size` - set size of the cache (default 1000)
-You can look at an example topology LookupWordCount.java under `src/test/java`.
+You can look at an example topology LookupWordCount.java under `examples/storm-hbase-examples/src/main/java`.
## Example: Persistent Word Count
-A runnable example can be found in the `src/test/java` directory.
+A runnable example can be found in the `examples/storm-hbase-examples/src/main/java` directory.
### Setup
The following steps assume you are running HBase locally, or there is an `hbase-site.xml` on the
@@ -218,6 +218,12 @@ public class PersistentWordCount {
public static void main(String[] args) throws Exception {
Config config = new Config();
+ Map<String, Object> hbConf = new HashMap<String, Object>();
+ if(args.length > 0){
+ hbConf.put("hbase.rootdir", args[0]);
+ }
+ config.put("hbase.conf", hbConf);
+
WordSpout spout = new WordSpout();
WordCounter bolt = new WordCounter();
@@ -227,7 +233,8 @@ public class PersistentWordCount {
.withCounterFields(new Fields("count"))
.withColumnFamily("cf");
- HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
+ HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
+ .withConfigKey("hbase.conf");
// wordSpout ==> countBolt ==> HBaseBolt
@@ -238,16 +245,23 @@ public class PersistentWordCount {
builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
- if (args.length == 0) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", config, builder.createTopology());
- Thread.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
+ if (args.length == 1) {
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
+ Thread.sleep(30000);
+ }
System.exit(0);
- } else {
+ } else if (args.length == 2) {
+ StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+ } else if (args.length == 4) {
+ System.out.println("hdfs url: " + args[0] + ", keytab file: " + args[2] +
+ ", principal name: " + args[3] + ", toplogy name: " + args[1]);
+ hbConf.put(HBaseSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);
+ hbConf.put(HBaseSecurityUtil.STORM_USER_NAME_KEY, args[3]);
config.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], config, builder.createTopology());
+ StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+ } else {
+ System.out.println("Usage: PersistentWordCount <hbase.rootdir> [topology name] [keytab file] [principal name]");
}
}
}