You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/16 03:04:21 UTC

[4/8] storm git commit: STORM-1079. Batch Puts to HBase. Add default flushIntervalSecs.

STORM-1079. Batch Puts to HBase. Add default flushIntervalSecs.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a25ab79
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a25ab79
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a25ab79

Branch: refs/heads/master
Commit: 9a25ab79b4457c4ed98b64c96218e0462efe72c4
Parents: b53b9eb
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Oct 8 09:13:54 2015 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Oct 8 09:13:54 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/storm/hbase/bolt/HBaseBolt.java    | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9a25ab79/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index 4cdf388..34e2eba 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -78,11 +78,14 @@ public class HBaseBolt  extends AbstractHBaseBolt {
             conf = new Config();
         }
 
-        if (flushIntervalSecs > 0) {
-            LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + "]");
-            conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
+        if (conf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0) {
+            Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString());
+            flushIntervalSecs = (int)(Math.floor(topologyTimeout / 2));
+            LOG.debug("Setting flush interval to [" + flushIntervalSecs + "] based on topology.message.timeout.secs");
         }
 
+        LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + "]");
+        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
         return conf;
     }