You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/03/09 15:32:56 UTC

camel git commit: CAMEL-9523 Use setStopRow on org.apache.hadoop.hbase.client.Scan

Repository: camel
Updated Branches:
  refs/heads/master 758c575ae -> e95b669e8


CAMEL-9523 Use setStopRow on org.apache.hadoop.hbase.client.Scan


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e95b669e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e95b669e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e95b669e

Branch: refs/heads/master
Commit: e95b669e8ac4734651ba8a632de17d5bf3ab9950
Parents: 758c575
Author: Andrea Cosentino <an...@gmail.com>
Authored: Wed Mar 9 15:30:17 2016 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Wed Mar 9 15:31:12 2016 +0100

----------------------------------------------------------------------
 .../camel/component/hbase/HBaseConstants.java   |  2 ++
 .../camel/component/hbase/HBaseProducer.java    |  9 ++++++--
 .../component/hbase/HBaseProducerTest.java      | 24 +++++++++++++++++++-
 3 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e95b669e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
index 42ae4fc..3db652a 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
@@ -28,4 +28,6 @@ public interface HBaseConstants {
     String HBASE_MAX_SCAN_RESULTS = "CamelHBaseMaxScanResults";
     
     String FROM_ROW = "CamelHBaseStartRow";
+    
+    String STOP_ROW = "CamelHBaseStopRow";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e95b669e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
index b9ccfce..b09c829 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
@@ -63,6 +63,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
 
             Integer maxScanResult = exchange.getIn().getHeader(HBaseConstants.HBASE_MAX_SCAN_RESULTS, Integer.class);
             String fromRowId = (String) exchange.getIn().getHeader(HBaseConstants.FROM_ROW);
+            String stopRowId = (String) exchange.getIn().getHeader(HBaseConstants.STOP_ROW);
             CellMappingStrategy mappingStrategy = endpoint.getCellMappingStrategyFactory().getStrategy(exchange.getIn());
 
             HBaseData data = mappingStrategy.resolveModel(exchange.getIn());
@@ -82,7 +83,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
                 } else if (HBaseConstants.DELETE.equals(operation)) {
                     deleteOperations.add(createDeleteRow(hRow));
                 } else if (HBaseConstants.SCAN.equals(operation)) {
-                    scanOperationResult = scanCells(table, hRow, fromRowId, maxScanResult, endpoint.getFilters());
+                    scanOperationResult = scanCells(table, hRow, fromRowId, stopRowId, maxScanResult, endpoint.getFilters());
                 }
             }
 
@@ -189,7 +190,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
      * Performs an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs).
      * The result is <p>the most recent entry</p> for each column.
      */
-    private List<HBaseRow> scanCells(Table table, HBaseRow model, String start, Integer maxRowScan, List<Filter> filters)
+    private List<HBaseRow> scanCells(Table table, HBaseRow model, String start, String stop, Integer maxRowScan, List<Filter> filters)
             throws Exception {
         List<HBaseRow> rowSet = new LinkedList<>();
 
@@ -202,6 +203,10 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
         } else {
             scan = new Scan();
         }
+        
+        if (ObjectHelper.isNotEmpty(stop)) {
+            scan.setStopRow(Bytes.toBytes(stop));
+        }
 
         if (filters != null && !filters.isEmpty()) {
             for (int i = 0; i < filters.size(); i++) {

http://git-wip-us.apache.org/repos/asf/camel/blob/e95b669e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
index cac4539..ba96175 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
@@ -275,11 +275,33 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
             Object result1 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
             Object result2 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
             Object result3 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
-
+            System.err.println(resp.getOut().getHeaders().toString());
             List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]);
             assertTrue(bodies.contains(result1) && bodies.contains(result2) && bodies.contains(result3));
         }
     }
+    
+    @Test
+    public void testPutMultiRowsAndScanWithStop() throws Exception {
+        testPutMultiRows();
+        if (systemReady) {
+            Exchange resp = template.request("direct:scan", new Processor() {
+                public void process(Exchange exchange) throws Exception {
+                    exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+                    exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+                    exchange.getIn().setHeader(HBaseConstants.FROM_ROW, key[0]);
+                    exchange.getIn().setHeader(HBaseConstants.STOP_ROW, key[1]);
+                }
+            });
+
+            Object result1 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
+            Object result2 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
+            Object result3 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
+
+            List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]);
+            assertTrue(bodies.contains(result1) && !bodies.contains(result2) && !bodies.contains(result3));
+        }
+    }
 
     @Test
     public void testPutAndScan() throws Exception {