You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/10/15 18:15:43 UTC

[30/50] git commit: STORM-519 adding tuple as an input param to HBaseValueMapper so user can chose to emit fields from original tuple in addition to lookup result.

STORM-519 adding tuple as an input param to HBaseValueMapper so user can chose to emit fields from original tuple in addition to lookup result.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f87bb313
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f87bb313
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f87bb313

Branch: refs/heads/security
Commit: f87bb3135b4dcfd2b73689a35bf0b73a6cc5bc75
Parents: 1babd83
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Oct 6 15:21:28 2014 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Oct 6 15:21:28 2014 -0700

----------------------------------------------------------------------
 external/storm-hbase/pom.xml                                   | 2 +-
 .../main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java | 2 +-
 .../org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java   | 4 +++-
 .../java/org/apache/storm/hbase/trident/state/HBaseState.java  | 6 ++++--
 .../org/apache/storm/hbase/topology/WordCountValueMapper.java  | 3 ++-
 5 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index f976cea..a492164 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.3-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
index 12263a6..c6838be 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
@@ -66,7 +66,7 @@ public class HBaseLookupBolt extends AbstractHBaseBolt {
 
         try {
             Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0];
-            for(Values values : rowToTupleMapper.toValues(result)) {
+            for(Values values : rowToTupleMapper.toValues(tuple, result)) {
                 this.collector.emit(values);
             }
             this.collector.ack(tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
index 39ce47a..bc38b83 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
@@ -18,6 +18,7 @@
 package org.apache.storm.hbase.bolt.mapper;
 
 import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.ITuple;
 import backtype.storm.tuple.Values;
 import org.apache.hadoop.hbase.client.Result;
 
@@ -27,11 +28,12 @@ import java.util.List;
 public interface HBaseValueMapper extends Serializable {
     /**
      *
+     * @param input tuple.
      * @param result HBase lookup result instance.
      * @return list of values that should be emitted by the lookup bolt.
      * @throws Exception
      */
-    public List<Values> toValues(Result result) throws Exception;
+    public List<Values> toValues(ITuple input, Result result) throws Exception;
 
     /**
      * declares the output fields for the lookup bolt.

http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
index 66decf2..7b31fad 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
@@ -149,8 +149,10 @@ public class HBaseState implements State {
 
         try {
             Result[] results = hBaseClient.batchGet(gets);
-            for(Result result : results) {
-                List<Values> values = options.rowToStormValueMapper.toValues(result);
+            for(int i = 0; i < results.length; i++) {
+                Result result = results[i];
+                TridentTuple tuple = tridentTuples.get(i);
+                List<Values> values = options.rowToStormValueMapper.toValues(tuple, result);
                 batchRetrieveResult.add(values);
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
index dd2ae20..2463085 100644
--- a/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
+++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
@@ -20,6 +20,7 @@ package org.apache.storm.hbase.topology;
 
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
 import backtype.storm.tuple.Values;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -51,7 +52,7 @@ import java.util.List;
 public class WordCountValueMapper implements HBaseValueMapper {
 
     @Override
-    public List<Values> toValues(Result result) throws Exception {
+    public List<Values> toValues(ITuple tuple, Result result) throws Exception {
         List<Values> values = new ArrayList<Values>();
         Cell[] cells = result.rawCells();
         for(Cell cell : cells) {