You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/16 03:04:23 UTC

[6/8] storm git commit: STORM-1079. Fixed reset of tupleBatch.

STORM-1079. Fixed reset of tupleBatch.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/99da29b4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/99da29b4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/99da29b4

Branch: refs/heads/master
Commit: 99da29b443c89580be34512b67d780c811d6d38a
Parents: fc03910
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Wed Oct 14 14:38:13 2015 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Wed Oct 14 14:38:13 2015 -0700

----------------------------------------------------------------------
 .../org/apache/storm/hbase/bolt/HBaseBolt.java  | 27 ++++++++++----------
 1 file changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/99da29b4/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index d470892..ea17b2c 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -93,34 +93,35 @@ public class HBaseBolt  extends AbstractHBaseBolt {
     @Override
     public void execute(Tuple tuple) {
         boolean flush = false;
-        if (TupleUtils.isTick(tuple)) {
-            LOG.debug("TICK received! current batch status [" + tupleBatch.size() + "/" + batchSize + "]");
-            flush = true;
-        } else {
-            byte[] rowKey = this.mapper.rowKey(tuple);
-            ColumnList cols = this.mapper.columns(tuple);
-            List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
-            batchMutations.addAll(mutations);
-            tupleBatch.add(tuple);
-            if (tupleBatch.size() >= batchSize) {
+        try {
+            if (TupleUtils.isTick(tuple)) {
+                LOG.debug("TICK received! current batch status [" + tupleBatch.size() + "/" + batchSize + "]");
                 flush = true;
+            } else {
+                byte[] rowKey = this.mapper.rowKey(tuple);
+                ColumnList cols = this.mapper.columns(tuple);
+                List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
+                batchMutations.addAll(mutations);
+                tupleBatch.add(tuple);
+                if (tupleBatch.size() >= batchSize) {
+                    flush = true;
+                }
             }
-        }
 
-        try {
             if (flush && !tupleBatch.isEmpty()) {
                 this.hBaseClient.batchMutate(batchMutations);
                 LOG.debug("acknowledging tuples after batchMutate");
                 for(Tuple t : tupleBatch) {
                     collector.ack(t);
                 }
+                tupleBatch.clear();
+                batchMutations.clear();
             }
         } catch(Exception e){
             this.collector.reportError(e);
             for (Tuple t : tupleBatch) {
                 collector.fail(t);
             }
-        } finally {
             tupleBatch.clear();
             batchMutations.clear();
         }