You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/16 03:04:20 UTC

[3/8] storm git commit: STORM-1079. Batch Puts to HBase.

STORM-1079. Batch Puts to HBase.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b53b9eb3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b53b9eb3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b53b9eb3

Branch: refs/heads/master
Commit: b53b9eb3c12f8caa7180c880ba52c550535ef192
Parents: 7710eef
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Sat Oct 3 08:06:57 2015 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Sat Oct 3 08:06:57 2015 -0700

----------------------------------------------------------------------
 .../org/apache/storm/hbase/bolt/AbstractHBaseBolt.java  |  1 -
 .../java/org/apache/storm/hbase/bolt/HBaseBolt.java     | 12 ++++++++----
 2 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b53b9eb3/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
index 3372192..404aa7a 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
@@ -74,6 +74,5 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt {
         Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);
         hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));
         this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, tableName);
-
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b53b9eb3/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index 7eeca77..4cdf388 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -74,8 +74,9 @@ public class HBaseBolt  extends AbstractHBaseBolt {
     @Override
     public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> conf = super.getComponentConfiguration();
-        if (conf == null)
+        if (conf == null) {
             conf = new Config();
+        }
 
         if (flushIntervalSecs > 0) {
             LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + "]");
@@ -98,21 +99,24 @@ public class HBaseBolt  extends AbstractHBaseBolt {
             List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
             batchMutations.addAll(mutations);
             tupleBatch.add(tuple);
-            if (tupleBatch.size() >= batchSize)
+            if (tupleBatch.size() >= batchSize) {
                 flush = true;
+            }
         }
 
         try {
             if (flush && !tupleBatch.isEmpty()) {
                 this.hBaseClient.batchMutate(batchMutations);
                 LOG.debug("acknowledging tuples after batchMutate");
-                for(Tuple t : tupleBatch)
+                for(Tuple t : tupleBatch) {
                     collector.ack(t);
+                }
             }
         } catch(Exception e){
             this.collector.reportError(e);
-            for (Tuple t : tupleBatch)
+            for (Tuple t : tupleBatch) {
                 collector.fail(t);
+            }
         } finally {
             tupleBatch.clear();
             batchMutations.clear();