You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/16 03:04:23 UTC
[6/8] storm git commit: STORM-1079. Fixed reset of tupleBatch.
STORM-1079. Fixed reset of tupleBatch.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/99da29b4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/99da29b4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/99da29b4
Branch: refs/heads/master
Commit: 99da29b443c89580be34512b67d780c811d6d38a
Parents: fc03910
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Wed Oct 14 14:38:13 2015 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Wed Oct 14 14:38:13 2015 -0700
----------------------------------------------------------------------
.../org/apache/storm/hbase/bolt/HBaseBolt.java | 27 ++++++++++----------
1 file changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/99da29b4/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index d470892..ea17b2c 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -93,34 +93,35 @@ public class HBaseBolt extends AbstractHBaseBolt {
@Override
public void execute(Tuple tuple) {
boolean flush = false;
- if (TupleUtils.isTick(tuple)) {
- LOG.debug("TICK received! current batch status [" + tupleBatch.size() + "/" + batchSize + "]");
- flush = true;
- } else {
- byte[] rowKey = this.mapper.rowKey(tuple);
- ColumnList cols = this.mapper.columns(tuple);
- List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
- batchMutations.addAll(mutations);
- tupleBatch.add(tuple);
- if (tupleBatch.size() >= batchSize) {
+ try {
+ if (TupleUtils.isTick(tuple)) {
+ LOG.debug("TICK received! current batch status [" + tupleBatch.size() + "/" + batchSize + "]");
flush = true;
+ } else {
+ byte[] rowKey = this.mapper.rowKey(tuple);
+ ColumnList cols = this.mapper.columns(tuple);
+ List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
+ batchMutations.addAll(mutations);
+ tupleBatch.add(tuple);
+ if (tupleBatch.size() >= batchSize) {
+ flush = true;
+ }
}
- }
- try {
if (flush && !tupleBatch.isEmpty()) {
this.hBaseClient.batchMutate(batchMutations);
LOG.debug("acknowledging tuples after batchMutate");
for(Tuple t : tupleBatch) {
collector.ack(t);
}
+ tupleBatch.clear();
+ batchMutations.clear();
}
} catch(Exception e){
this.collector.reportError(e);
for (Tuple t : tupleBatch) {
collector.fail(t);
}
- } finally {
tupleBatch.clear();
batchMutations.clear();
}