You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/16 03:04:20 UTC
[3/8] storm git commit: STORM-1079. Batch Puts to HBase.
STORM-1079. Batch Puts to HBase.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b53b9eb3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b53b9eb3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b53b9eb3
Branch: refs/heads/master
Commit: b53b9eb3c12f8caa7180c880ba52c550535ef192
Parents: 7710eef
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Sat Oct 3 08:06:57 2015 -0700
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Sat Oct 3 08:06:57 2015 -0700
----------------------------------------------------------------------
.../org/apache/storm/hbase/bolt/AbstractHBaseBolt.java | 1 -
.../java/org/apache/storm/hbase/bolt/HBaseBolt.java | 12 ++++++++----
2 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b53b9eb3/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
index 3372192..404aa7a 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
@@ -74,6 +74,5 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt {
Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);
hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));
this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, tableName);
-
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b53b9eb3/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index 7eeca77..4cdf388 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -74,8 +74,9 @@ public class HBaseBolt extends AbstractHBaseBolt {
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = super.getComponentConfiguration();
- if (conf == null)
+ if (conf == null) {
conf = new Config();
+ }
if (flushIntervalSecs > 0) {
LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + "]");
@@ -98,21 +99,24 @@ public class HBaseBolt extends AbstractHBaseBolt {
List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
batchMutations.addAll(mutations);
tupleBatch.add(tuple);
- if (tupleBatch.size() >= batchSize)
+ if (tupleBatch.size() >= batchSize) {
flush = true;
+ }
}
try {
if (flush && !tupleBatch.isEmpty()) {
this.hBaseClient.batchMutate(batchMutations);
LOG.debug("acknowledging tuples after batchMutate");
- for(Tuple t : tupleBatch)
+ for(Tuple t : tupleBatch) {
collector.ack(t);
+ }
}
} catch(Exception e){
this.collector.reportError(e);
- for (Tuple t : tupleBatch)
+ for (Tuple t : tupleBatch) {
collector.fail(t);
+ }
} finally {
tupleBatch.clear();
batchMutations.clear();