You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/10/15 18:15:43 UTC
[30/50] git commit: STORM-519 adding tuple as an input param to
HBaseValueMapper so user can chose to emit fields from original tuple in
addition to lookup result.
STORM-519 adding tuple as an input param to HBaseValueMapper so user can chose to emit fields from original tuple in addition to lookup result.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f87bb313
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f87bb313
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f87bb313
Branch: refs/heads/security
Commit: f87bb3135b4dcfd2b73689a35bf0b73a6cc5bc75
Parents: 1babd83
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Oct 6 15:21:28 2014 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Oct 6 15:21:28 2014 -0700
----------------------------------------------------------------------
external/storm-hbase/pom.xml | 2 +-
.../main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java | 2 +-
.../org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java | 4 +++-
.../java/org/apache/storm/hbase/trident/state/HBaseState.java | 6 ++++--
.../org/apache/storm/hbase/topology/WordCountValueMapper.java | 3 ++-
5 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index f976cea..a492164 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.3-incubating-SNAPSHOT</version>
+ <version>0.9.3-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
index 12263a6..c6838be 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
@@ -66,7 +66,7 @@ public class HBaseLookupBolt extends AbstractHBaseBolt {
try {
Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0];
- for(Values values : rowToTupleMapper.toValues(result)) {
+ for(Values values : rowToTupleMapper.toValues(tuple, result)) {
this.collector.emit(values);
}
this.collector.ack(tuple);
http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
index 39ce47a..bc38b83 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
@@ -18,6 +18,7 @@
package org.apache.storm.hbase.bolt.mapper;
import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Values;
import org.apache.hadoop.hbase.client.Result;
@@ -27,11 +28,12 @@ import java.util.List;
public interface HBaseValueMapper extends Serializable {
/**
*
+ * @param input tuple.
* @param result HBase lookup result instance.
* @return list of values that should be emitted by the lookup bolt.
* @throws Exception
*/
- public List<Values> toValues(Result result) throws Exception;
+ public List<Values> toValues(ITuple input, Result result) throws Exception;
/**
* declares the output fields for the lookup bolt.
http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
index 66decf2..7b31fad 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
@@ -149,8 +149,10 @@ public class HBaseState implements State {
try {
Result[] results = hBaseClient.batchGet(gets);
- for(Result result : results) {
- List<Values> values = options.rowToStormValueMapper.toValues(result);
+ for(int i = 0; i < results.length; i++) {
+ Result result = results[i];
+ TridentTuple tuple = tridentTuples.get(i);
+ List<Values> values = options.rowToStormValueMapper.toValues(tuple, result);
batchRetrieveResult.add(values);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
index dd2ae20..2463085 100644
--- a/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
+++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
@@ -20,6 +20,7 @@ package org.apache.storm.hbase.topology;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Values;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -51,7 +52,7 @@ import java.util.List;
public class WordCountValueMapper implements HBaseValueMapper {
@Override
- public List<Values> toValues(Result result) throws Exception {
+ public List<Values> toValues(ITuple tuple, Result result) throws Exception {
List<Values> values = new ArrayList<Values>();
Cell[] cells = result.rawCells();
for(Cell cell : cells) {