You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/03/09 15:32:56 UTC
camel git commit: CAMEL-9523 Use setStopRow on
org.apache.hadoop.hbase.client.Scan
Repository: camel
Updated Branches:
refs/heads/master 758c575ae -> e95b669e8
CAMEL-9523 Use setStopRow on org.apache.hadoop.hbase.client.Scan
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e95b669e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e95b669e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e95b669e
Branch: refs/heads/master
Commit: e95b669e8ac4734651ba8a632de17d5bf3ab9950
Parents: 758c575
Author: Andrea Cosentino <an...@gmail.com>
Authored: Wed Mar 9 15:30:17 2016 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Wed Mar 9 15:31:12 2016 +0100
----------------------------------------------------------------------
.../camel/component/hbase/HBaseConstants.java | 2 ++
.../camel/component/hbase/HBaseProducer.java | 9 ++++++--
.../component/hbase/HBaseProducerTest.java | 24 +++++++++++++++++++-
3 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e95b669e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
index 42ae4fc..3db652a 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
@@ -28,4 +28,6 @@ public interface HBaseConstants {
String HBASE_MAX_SCAN_RESULTS = "CamelHBaseMaxScanResults";
String FROM_ROW = "CamelHBaseStartRow";
+
+ String STOP_ROW = "CamelHBaseStopRow";
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e95b669e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
index b9ccfce..b09c829 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
@@ -63,6 +63,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
Integer maxScanResult = exchange.getIn().getHeader(HBaseConstants.HBASE_MAX_SCAN_RESULTS, Integer.class);
String fromRowId = (String) exchange.getIn().getHeader(HBaseConstants.FROM_ROW);
+ String stopRowId = (String) exchange.getIn().getHeader(HBaseConstants.STOP_ROW);
CellMappingStrategy mappingStrategy = endpoint.getCellMappingStrategyFactory().getStrategy(exchange.getIn());
HBaseData data = mappingStrategy.resolveModel(exchange.getIn());
@@ -82,7 +83,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
} else if (HBaseConstants.DELETE.equals(operation)) {
deleteOperations.add(createDeleteRow(hRow));
} else if (HBaseConstants.SCAN.equals(operation)) {
- scanOperationResult = scanCells(table, hRow, fromRowId, maxScanResult, endpoint.getFilters());
+ scanOperationResult = scanCells(table, hRow, fromRowId, stopRowId, maxScanResult, endpoint.getFilters());
}
}
@@ -189,7 +190,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
* Performs an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs).
* The result is <p>the most recent entry</p> for each column.
*/
- private List<HBaseRow> scanCells(Table table, HBaseRow model, String start, Integer maxRowScan, List<Filter> filters)
+ private List<HBaseRow> scanCells(Table table, HBaseRow model, String start, String stop, Integer maxRowScan, List<Filter> filters)
throws Exception {
List<HBaseRow> rowSet = new LinkedList<>();
@@ -202,6 +203,10 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
} else {
scan = new Scan();
}
+
+ if (ObjectHelper.isNotEmpty(stop)) {
+ scan.setStopRow(Bytes.toBytes(stop));
+ }
if (filters != null && !filters.isEmpty()) {
for (int i = 0; i < filters.size(); i++) {
http://git-wip-us.apache.org/repos/asf/camel/blob/e95b669e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
index cac4539..ba96175 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
@@ -275,11 +275,33 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
Object result1 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
Object result2 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
Object result3 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
-
+ System.err.println(resp.getOut().getHeaders().toString());
List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]);
assertTrue(bodies.contains(result1) && bodies.contains(result2) && bodies.contains(result3));
}
}
+
+ @Test
+ public void testPutMultiRowsAndScanWithStop() throws Exception {
+ testPutMultiRows();
+ if (systemReady) {
+ Exchange resp = template.request("direct:scan", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ exchange.getIn().setHeader(HBaseConstants.FROM_ROW, key[0]);
+ exchange.getIn().setHeader(HBaseConstants.STOP_ROW, key[1]);
+ }
+ });
+
+ Object result1 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
+ Object result2 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
+ Object result3 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
+
+ List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]);
+ assertTrue(bodies.contains(result1) && !bodies.contains(result2) && !bodies.contains(result3));
+ }
+ }
@Test
public void testPutAndScan() throws Exception {