You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/16 03:04:21 UTC
[4/8] storm git commit: STORM-1079. Batch Puts to HBase. Add default
flushIntervalSecs.
STORM-1079. Batch Puts to HBase. Add default flushIntervalSecs.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a25ab79
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a25ab79
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a25ab79
Branch: refs/heads/master
Commit: 9a25ab79b4457c4ed98b64c96218e0462efe72c4
Parents: b53b9eb
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Oct 8 09:13:54 2015 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Oct 8 09:13:54 2015 -0700
----------------------------------------------------------------------
.../main/java/org/apache/storm/hbase/bolt/HBaseBolt.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9a25ab79/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index 4cdf388..34e2eba 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -78,11 +78,14 @@ public class HBaseBolt extends AbstractHBaseBolt {
conf = new Config();
}
- if (flushIntervalSecs > 0) {
- LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + "]");
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
+ if (conf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0) {
+ Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString());
+ flushIntervalSecs = (int)(Math.floor(topologyTimeout / 2));
+ LOG.debug("Setting flush interval to [" + flushIntervalSecs + "] based on topology.message.timeout.secs");
}
+ LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + "]");
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
return conf;
}