You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/25 17:24:45 UTC
camel git commit: CAMEL-9644 - camel-hbase : remove deprecated code
Repository: camel
Updated Branches:
refs/heads/master 800d74f70 -> bb3a75efc
CAMEL-9644 - camel-hbase : remove deprecated code
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bb3a75ef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bb3a75ef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bb3a75ef
Branch: refs/heads/master
Commit: bb3a75efcec72fa1c9f45069353a091b77697d95
Parents: 800d74f
Author: lburgazzoli <lb...@gmail.com>
Authored: Thu Feb 25 15:35:46 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Feb 25 17:14:32 2016 +0100
----------------------------------------------------------------------
.../camel/component/hbase/HBaseAttribute.java | 73 +++++++++
.../camel/component/hbase/HBaseComponent.java | 18 ++-
.../camel/component/hbase/HBaseConsumer.java | 26 ++--
.../camel/component/hbase/HBaseContats.java | 34 -----
.../component/hbase/HBaseDeleteHandler.java | 5 +-
.../camel/component/hbase/HBaseEndpoint.java | 49 +++---
.../camel/component/hbase/HBaseProducer.java | 59 ++++----
.../component/hbase/HBaseRemoveHandler.java | 4 +-
.../camel/component/hbase/HbaseAttribute.java | 65 --------
.../hbase/mapping/CellMappingStrategy.java | 2 +-
.../mapping/CellMappingStrategyFactory.java | 10 +-
.../hbase/mapping/HeaderMappingStrategy.java | 26 ++--
.../idempotent/HBaseIdempotentRepository.java | 72 +++------
.../component/hbase/CamelHBaseFilterTest.java | 13 +-
.../component/hbase/CamelHBaseTestSupport.java | 10 +-
.../component/hbase/HBaseConsumerTest.java | 12 +-
.../component/hbase/HBaseConvertionsTest.java | 25 ++--
.../component/hbase/HBaseProducerTest.java | 148 ++++++++++---------
.../HBaseIdempotentRepositoryTest.java | 3 -
.../src/test/resources/log4j.properties | 1 -
parent/pom.xml | 2 +-
21 files changed, 312 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseAttribute.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseAttribute.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseAttribute.java
new file mode 100644
index 0000000..56b190e
--- /dev/null
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseAttribute.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hbase;
+
+public enum HBaseAttribute {
+
+ HBASE_ROW_ID("CamelHBaseRowId"),
+ HBASE_ROW_TYPE("CamelHBaseRowType"),
+ HBASE_MARKED_ROW_ID("CamelHBaseMarkedRowId"),
+ HBASE_FAMILY("CamelHBaseFamily"),
+ HBASE_QUALIFIER("CamelHBaseQualifier"),
+ HBASE_VALUE("CamelHBaseValue"),
+ HBASE_VALUE_TYPE("CamelHBaseValueType");
+
+ private final String value;
+ private final String option;
+
+ HBaseAttribute(String value) {
+ this.value = value;
+ this.option = asOption(value);
+ }
+
+ public String asHeader(int i) {
+ if (i > 1) {
+ return value + i;
+ } else {
+ return value;
+ }
+ }
+
+ public String asHeader() {
+ return value;
+ }
+
+ public String asOption() {
+ return option;
+ }
+
+ public String asOption(int i) {
+ if (i > 1) {
+ return option + i;
+ } else {
+ return option;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ private static String asOption(String name) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name, "CamelHBase".length(), name.length());
+ sb.setCharAt(0, Character.toLowerCase(sb.charAt(0)));
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java
index dcb8ac6..731b288 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java
@@ -17,13 +17,15 @@
package org.apache.camel.component.hbase;
import java.util.Map;
+import java.util.concurrent.Executors;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
import org.apache.camel.util.IntrospectionSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
/**
* Represents the component that manages {@link HBaseEndpoint}.
@@ -31,7 +33,7 @@ import org.apache.hadoop.hbase.client.HTablePool;
public class HBaseComponent extends UriEndpointComponent {
private Configuration configuration;
- private HTablePool tablePool;
+ private Connection connection;
private int poolMaxSize = 10;
public HBaseComponent() {
@@ -43,18 +45,22 @@ public class HBaseComponent extends UriEndpointComponent {
if (configuration == null) {
configuration = HBaseConfiguration.create();
}
- tablePool = new HTablePool(configuration, poolMaxSize);
+
+ connection = ConnectionFactory.createConnection(
+ configuration,
+ Executors.newFixedThreadPool(poolMaxSize)
+ );
}
@Override
protected void doStop() throws Exception {
- if (tablePool != null) {
- tablePool.close();
+ if (connection != null) {
+ connection.close();
}
}
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- HBaseEndpoint endpoint = new HBaseEndpoint(uri, this, tablePool, remaining);
+ HBaseEndpoint endpoint = new HBaseEndpoint(uri, this, connection, remaining);
Map<String, Object> mapping = IntrospectionSupport.extractProperties(parameters, "row.");
endpoint.setRowMapping(mapping);
setProperties(endpoint, parameters);
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
index 59aaa43..b54b284 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
@@ -33,10 +33,10 @@ import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
@@ -61,15 +61,14 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
@Override
protected int poll() throws Exception {
- HTableInterface table = endpoint.getTable();
- try {
+ try (Table table = endpoint.getTable()) {
shutdownRunningTask = null;
pendingExchanges = 0;
- Queue<Exchange> queue = new LinkedList<Exchange>();
+ Queue<Exchange> queue = new LinkedList<>();
Scan scan = new Scan();
- List<Filter> filters = new LinkedList<Filter>();
+ List<Filter> filters = new LinkedList<>();
if (endpoint.getFilters() != null) {
filters.addAll(endpoint.getFilters());
}
@@ -111,8 +110,10 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
HBaseCell resultCell = new HBaseCell();
String family = modelCell.getFamily();
String column = modelCell.getQualifier();
- resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(modelCell.getValueType(),
- result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column))));
+ resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(
+ modelCell.getValueType(),
+ result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)))
+ );
resultCell.setFamily(modelCell.getFamily());
resultCell.setQualifier(modelCell.getQualifier());
resultRow.getCells().add(resultCell);
@@ -136,15 +137,13 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
exchange.getIn().setHeader(CellMappingStrategyFactory.STRATEGY, CellMappingStrategyFactory.BODY);
mappingStrategy.applyScanResults(exchange.getIn(), data);
//Make sure that there is a header containing the marked row ids, so that they can be deleted.
- exchange.getIn().setHeader(HbaseAttribute.HBASE_MARKED_ROW_ID.asHeader(), result.getRow());
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_MARKED_ROW_ID.asHeader(), result.getRow());
queue.add(exchange);
exchangeCount++;
}
}
scanner.close();
return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue));
- } finally {
- table.close();
}
}
@@ -177,7 +176,7 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
}
if (endpoint.isRemove()) {
- remove((byte[]) exchange.getIn().getHeader(HbaseAttribute.HBASE_MARKED_ROW_ID.asHeader()));
+ remove((byte[]) exchange.getIn().getHeader(HBaseAttribute.HBASE_MARKED_ROW_ID.asHeader()));
}
}
@@ -188,11 +187,8 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
* Delegates to the {@link HBaseRemoveHandler}.
*/
private void remove(byte[] row) throws IOException {
- HTableInterface table = endpoint.getTable();
- try {
+ try (Table table = endpoint.getTable()) {
endpoint.getRemoveHandler().remove(table, row);
- } finally {
- table.close();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseContats.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseContats.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseContats.java
deleted file mode 100644
index 1536f78..0000000
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseContats.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.hbase;
-
-@Deprecated
-public final class HBaseContats {
-
- public static final String OPERATION = "CamelHBaseOperation";
-
- public static final String PUT = "CamelHBasePut";
- public static final String GET = "CamelHBaseGet";
- public static final String SCAN = "CamelHBaseScan";
- public static final String DELETE = "CamelHBaseDelete";
-
- public static final String HBASE_MAX_SCAN_RESULTS = "CamelHBaseMaxScanResults";
-
- private HBaseContats() {
- //Utility Class
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseDeleteHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseDeleteHandler.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseDeleteHandler.java
index d1901a5..c582cc1 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseDeleteHandler.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseDeleteHandler.java
@@ -17,8 +17,9 @@
package org.apache.camel.component.hbase;
import java.io.IOException;
+
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +31,7 @@ public class HBaseDeleteHandler implements HBaseRemoveHandler {
* Performs a {@link Delete} of the specified row.
*/
@Override
- public void remove(HTableInterface table, byte[] row) {
+ public void remove(Table table, byte[] row) {
Delete delete = new Delete(row);
try {
table.delete(delete);
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
index 27c5773..81bfe7f 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.hbase;
+import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.Map;
@@ -32,9 +33,10 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.security.UserGroupInformation;
@@ -45,7 +47,7 @@ import org.apache.hadoop.security.UserGroupInformation;
public class HBaseEndpoint extends DefaultEndpoint {
private Configuration configuration;
- private final HTablePool tablePool;
+ private final Connection connection;
private HBaseAdmin admin;
@UriPath(description = "The name of the table") @Metadata(required = "true")
@@ -80,10 +82,10 @@ public class HBaseEndpoint extends DefaultEndpoint {
*/
private byte[] tableNameBytes;
- public HBaseEndpoint(String uri, HBaseComponent component, HTablePool tablePool, String tableName) {
+ public HBaseEndpoint(String uri, HBaseComponent component, Connection connection, String tableName) {
super(uri, component);
this.tableName = tableName;
- this.tablePool = tablePool;
+ this.connection = connection;
if (this.tableName == null) {
throw new IllegalArgumentException("Table name can not be null");
} else {
@@ -275,21 +277,30 @@ public class HBaseEndpoint extends DefaultEndpoint {
}
}
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ }
+
/**
* Gets connection to the table (secured or not, depends on the object initialization)
* please remember to close the table after use
* @return table, remember to close!
*/
- public HTableInterface getTable() {
+ public Table getTable() throws IOException {
if (userGroupInformation != null) {
- return userGroupInformation.doAs(new PrivilegedAction<HTableInterface>() {
+ return userGroupInformation.doAs(new PrivilegedAction<Table>() {
@Override
- public HTableInterface run() {
- return tablePool.getTable(tableNameBytes);
+ public Table run() {
+ try {
+ return connection.getTable(TableName.valueOf(tableNameBytes));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
});
} else {
- return tablePool.getTable(tableNameBytes);
+ return connection.getTable(TableName.valueOf(tableNameBytes));
}
}
@@ -298,20 +309,20 @@ public class HBaseEndpoint extends DefaultEndpoint {
*/
private HBaseRow createRowModel(Map<String, Object> parameters) {
HBaseRow rowModel = new HBaseRow();
- if (parameters.containsKey(HbaseAttribute.HBASE_ROW_TYPE.asOption())) {
- String rowType = String.valueOf(parameters.remove(HbaseAttribute.HBASE_ROW_TYPE.asOption()));
+ if (parameters.containsKey(HBaseAttribute.HBASE_ROW_TYPE.asOption())) {
+ String rowType = String.valueOf(parameters.remove(HBaseAttribute.HBASE_ROW_TYPE.asOption()));
if (rowType != null && !rowType.isEmpty()) {
rowModel.setRowType(getCamelContext().getClassResolver().resolveClass(rowType));
}
}
- for (int i = 1; parameters.get(HbaseAttribute.HBASE_FAMILY.asOption(i)) != null
- && parameters.get(HbaseAttribute.HBASE_QUALIFIER.asOption(i)) != null; i++) {
+ for (int i = 1; parameters.get(HBaseAttribute.HBASE_FAMILY.asOption(i)) != null
+ && parameters.get(HBaseAttribute.HBASE_QUALIFIER.asOption(i)) != null; i++) {
HBaseCell cellModel = new HBaseCell();
- cellModel.setFamily(String.valueOf(parameters.remove(HbaseAttribute.HBASE_FAMILY.asOption(i))));
- cellModel.setQualifier(String.valueOf(parameters.remove(HbaseAttribute.HBASE_QUALIFIER.asOption(i))));
- cellModel.setValue(String.valueOf(parameters.remove(HbaseAttribute.HBASE_VALUE.asOption(i))));
- if (parameters.containsKey(HbaseAttribute.HBASE_VALUE_TYPE.asOption(i))) {
- String valueType = String.valueOf(parameters.remove(HbaseAttribute.HBASE_VALUE_TYPE.asOption(i)));
+ cellModel.setFamily(String.valueOf(parameters.remove(HBaseAttribute.HBASE_FAMILY.asOption(i))));
+ cellModel.setQualifier(String.valueOf(parameters.remove(HBaseAttribute.HBASE_QUALIFIER.asOption(i))));
+ cellModel.setValue(String.valueOf(parameters.remove(HBaseAttribute.HBASE_VALUE.asOption(i))));
+ if (parameters.containsKey(HBaseAttribute.HBASE_VALUE_TYPE.asOption(i))) {
+ String valueType = String.valueOf(parameters.remove(HBaseAttribute.HBASE_VALUE_TYPE.asOption(i)));
if (valueType != null && !valueType.isEmpty()) {
rowModel.setRowType(getCamelContext().getClassResolver().resolveClass(valueType));
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
index 93f8bb0..b9ccfce 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
@@ -30,14 +30,14 @@ import org.apache.camel.component.hbase.model.HBaseData;
import org.apache.camel.component.hbase.model.HBaseRow;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;
@@ -57,9 +57,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
}
public void process(Exchange exchange) throws Exception {
- HTableInterface table = endpoint.getTable();
- try {
-
+ try (Table table = endpoint.getTable()) {
updateHeaders(exchange);
String operation = (String) exchange.getIn().getHeader(HBaseConstants.OPERATION);
@@ -69,10 +67,10 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
HBaseData data = mappingStrategy.resolveModel(exchange.getIn());
- List<Put> putOperations = new LinkedList<Put>();
- List<Delete> deleteOperations = new LinkedList<Delete>();
- List<HBaseRow> getOperationResult = new LinkedList<HBaseRow>();
- List<HBaseRow> scanOperationResult = new LinkedList<HBaseRow>();
+ List<Put> putOperations = new LinkedList<>();
+ List<Delete> deleteOperations = new LinkedList<>();
+ List<HBaseRow> getOperationResult = new LinkedList<>();
+ List<HBaseRow> scanOperationResult = new LinkedList<>();
for (HBaseRow hRow : data.getRows()) {
hRow.apply(rowModel);
@@ -91,7 +89,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
//Check if we have something to add.
if (!putOperations.isEmpty()) {
table.put(putOperations);
- table.flushCommits();
} else if (!deleteOperations.isEmpty()) {
table.delete(deleteOperations);
} else if (!getOperationResult.isEmpty()) {
@@ -99,8 +96,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
} else if (!scanOperationResult.isEmpty()) {
mappingStrategy.applyScanResults(exchange.getOut(), new HBaseData(scanOperationResult));
}
- } finally {
- table.close();
}
}
@@ -124,8 +119,11 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
ObjectHelper.notNull(family, "HBase column family", cell);
ObjectHelper.notNull(column, "HBase column", cell);
- put.add(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column),
- endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, value));
+ put.addColumn(
+ HBaseHelper.getHBaseFieldAsBytes(family),
+ HBaseHelper.getHBaseFieldAsBytes(column),
+ endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, value)
+ );
}
return put;
}
@@ -134,9 +132,9 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
* Performs an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs).
* The result is <p>the most recent entry</p> for each column.
*/
- private HBaseRow getCells(HTableInterface table, HBaseRow hRow) throws Exception {
+ private HBaseRow getCells(Table table, HBaseRow hRow) throws Exception {
HBaseRow resultRow = new HBaseRow();
- List<HBaseCell> resultCells = new LinkedList<HBaseCell>();
+ List<HBaseCell> resultCells = new LinkedList<>();
ObjectHelper.notNull(hRow, "HBase row");
ObjectHelper.notNull(hRow.getId(), "HBase row id");
ObjectHelper.notNull(hRow.getCells(), "HBase cells");
@@ -166,11 +164,10 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
resultCell.setFamily(family);
resultCell.setQualifier(column);
- List<KeyValue> kvs = result.getColumn(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column));
+ List<Cell> kvs = result.getColumnCells(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column));
if (kvs != null && !kvs.isEmpty()) {
//Return the most recent entry.
- resultCell
- .setValue(endpoint.getCamelContext().getTypeConverter().convertTo(cellModel.getValueType(), kvs.get(0).getValue()));
+ resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(cellModel.getValueType(), kvs.get(0).getValue()));
resultCell.setTimestamp(kvs.get(0).getTimestamp());
}
resultCells.add(resultCell);
@@ -192,9 +189,9 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
* Performs an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs).
* The result is <p>the most recent entry</p> for each column.
*/
- private List<HBaseRow> scanCells(HTableInterface table, HBaseRow model, String start, Integer maxRowScan, List<Filter> filters)
+ private List<HBaseRow> scanCells(Table table, HBaseRow model, String start, Integer maxRowScan, List<Filter> filters)
throws Exception {
- List<HBaseRow> rowSet = new LinkedList<HBaseRow>();
+ List<HBaseRow> rowSet = new LinkedList<>();
HBaseRow startRow = new HBaseRow(model.getCells());
startRow.setId(start);
@@ -237,16 +234,22 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
HBaseCell resultCell = new HBaseCell();
String family = modelCell.getFamily();
String column = modelCell.getQualifier();
- resultRow.setId(endpoint.getCamelContext().getTypeConverter().convertTo(model.getRowType(), result.getRow()));
- resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(modelCell.getValueType(),
- result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column))));
+
+ resultRow.setId(endpoint.getCamelContext().getTypeConverter().convertTo(
+ model.getRowType(),
+ result.getRow())
+ );
+ resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(
+ modelCell.getValueType(),
+ result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)))
+ );
+
resultCell.setFamily(modelCell.getFamily());
resultCell.setQualifier(modelCell.getQualifier());
- if (result.getColumnLatest(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)) != null) {
- resultCell.setTimestamp(
- result.getColumnLatest(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column))
- .getTimestamp());
+ Cell cell = result.getColumnLatestCell(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column));
+ if (cell != null) {
+ resultCell.setTimestamp(cell.getTimestamp());
}
resultRow.getCells().add(resultCell);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseRemoveHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseRemoveHandler.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseRemoveHandler.java
index 18e3a63..0ea8d69 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseRemoveHandler.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseRemoveHandler.java
@@ -16,7 +16,7 @@
*/
package org.apache.camel.component.hbase;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
public interface HBaseRemoveHandler {
@@ -25,5 +25,5 @@ public interface HBaseRemoveHandler {
* The removal is not necessarily physical remove.
* The implementation decides how a row can be considered as removed.
*/
- void remove(HTableInterface table, byte[] row);
+ void remove(Table table, byte[] row);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HbaseAttribute.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HbaseAttribute.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HbaseAttribute.java
deleted file mode 100644
index d099096..0000000
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HbaseAttribute.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.hbase;
-
-public enum HbaseAttribute {
-
- HBASE_ROW_ID("CamelHBaseRowId"),
- HBASE_ROW_TYPE("CamelHBaseRowType"),
- HBASE_MARKED_ROW_ID("CamelHBaseMarkedRowId"),
- HBASE_FAMILY("CamelHBaseFamily"),
- HBASE_QUALIFIER("CamelHBaseQualifier"),
- HBASE_VALUE("CamelHBaseValue"),
- HBASE_VALUE_TYPE("CamelHBaseValueType");
-
- private final String value;
-
- private HbaseAttribute(String value) {
- this.value = value;
- }
-
- public String asHeader(int i) {
- if (i > 1) {
- return value + i;
- } else {
- return value;
- }
- }
-
- public String asHeader() {
- return value;
- }
-
- public String asOption() {
- String normalizedValue = value.replaceAll("CamelHBase", "");
- return normalizedValue.substring(0, 1).toLowerCase() + normalizedValue.substring(1);
- }
-
- public String asOption(int i) {
- String option = asOption();
- if (i > 1) {
- return option + i;
- } else {
- return option;
- }
- }
-
- @Override
- public String toString() {
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategy.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategy.java
index 47ffba2..56b352d 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategy.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategy.java
@@ -34,7 +34,7 @@ public interface CellMappingStrategy {
HBaseData resolveModel(Message message);
/**
- * Applies the KeyValues of a get opration to the {@link Exchange}.
+ * Applies the KeyValues of a get operation to the {@link Exchange}.
*
* @param message The message that will be applied the Get result.
* @param data The rows that will be applied to the message.
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategyFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategyFactory.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategyFactory.java
index e51e8c3..e795f22 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategyFactory.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategyFactory.java
@@ -31,11 +31,11 @@ public class CellMappingStrategyFactory {
public static final String BODY = "body";
private static final Logger LOG = LoggerFactory.getLogger(CellMappingStrategyFactory.class);
- private static final Map<String, CellMappingStrategy> DEFAULT_STRATIGIES = new HashMap<String, CellMappingStrategy>();
+ private static final Map<String, CellMappingStrategy> DEFAULT_STRATEGIES = new HashMap<String, CellMappingStrategy>();
public CellMappingStrategyFactory() {
- DEFAULT_STRATIGIES.put(HEADER, new HeaderMappingStrategy());
- DEFAULT_STRATIGIES.put(BODY, new BodyMappingStrategy());
+ DEFAULT_STRATEGIES.put(HEADER, new HeaderMappingStrategy());
+ DEFAULT_STRATEGIES.put(BODY, new BodyMappingStrategy());
}
public CellMappingStrategy getStrategy(Message message) {
@@ -43,7 +43,7 @@ public class CellMappingStrategyFactory {
//Check if strategy has been explicitly set.
if (message.getHeader(STRATEGY) != null) {
- strategy = DEFAULT_STRATIGIES.get(message.getHeader(STRATEGY, String.class));
+ strategy = DEFAULT_STRATEGIES.get(message.getHeader(STRATEGY, String.class));
}
if (strategy == null && message.getHeader(STRATEGY_CLASS_NAME) != null) {
@@ -55,7 +55,7 @@ public class CellMappingStrategyFactory {
}
//Fallback to the default strategy.
- return DEFAULT_STRATIGIES.get(HEADER);
+ return DEFAULT_STRATEGIES.get(HEADER);
}
private CellMappingStrategy loadStrategyFromClassName(String strategyClassName) {
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java
index d8c54bd..947f973 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java
@@ -21,7 +21,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.apache.camel.component.hbase.HbaseAttribute;
+import org.apache.camel.component.hbase.HBaseAttribute;
import org.apache.camel.component.hbase.model.HBaseCell;
import org.apache.camel.component.hbase.model.HBaseData;
import org.apache.camel.component.hbase.model.HBaseRow;
@@ -47,14 +47,14 @@ public class HeaderMappingStrategy implements CellMappingStrategy {
HBaseCell hCell = new HBaseCell();
if (message != null) {
- Object id = message.getHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(index));
- String rowClassName = message.getHeader(HbaseAttribute.HBASE_ROW_TYPE.asHeader(index), String.class);
+ Object id = message.getHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(index));
+ String rowClassName = message.getHeader(HBaseAttribute.HBASE_ROW_TYPE.asHeader(index), String.class);
Class<?> rowClass = rowClassName == null || rowClassName.isEmpty() ? String.class : message.getExchange().getContext().getClassResolver().resolveClass(rowClassName);
- String columnFamily = (String) message.getHeader(HbaseAttribute.HBASE_FAMILY.asHeader(index));
- String columnName = (String) message.getHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(index));
- Object value = message.getHeader(HbaseAttribute.HBASE_VALUE.asHeader(index));
+ String columnFamily = (String) message.getHeader(HBaseAttribute.HBASE_FAMILY.asHeader(index));
+ String columnName = (String) message.getHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(index));
+ Object value = message.getHeader(HBaseAttribute.HBASE_VALUE.asHeader(index));
- String valueClassName = message.getHeader(HbaseAttribute.HBASE_VALUE_TYPE.asHeader(index), String.class);
+ String valueClassName = message.getHeader(HBaseAttribute.HBASE_VALUE_TYPE.asHeader(index), String.class);
Class<?> valueClass = valueClassName == null || valueClassName.isEmpty() ? String.class : message.getExchange().getContext().getClassResolver().resolveClass(valueClassName);
//Id can be accepted as null when using get, scan etc.
@@ -84,7 +84,7 @@ public class HeaderMappingStrategy implements CellMappingStrategy {
int index = 1;
HBaseData data = new HBaseData();
//We use a LinkedHashMap to preserve the order.
- Map<Object, HBaseRow> rows = new LinkedHashMap<Object, HBaseRow>();
+ Map<Object, HBaseRow> rows = new LinkedHashMap<>();
HBaseRow hRow = new HBaseRow();
while (hRow != null) {
hRow = resolveRow(message, index++);
@@ -116,7 +116,7 @@ public class HeaderMappingStrategy implements CellMappingStrategy {
if (hRow.getId() != null) {
Set<HBaseCell> cells = hRow.getCells();
for (HBaseCell cell : cells) {
- message.setHeader(HbaseAttribute.HBASE_VALUE.asHeader(index++), getValueForColumn(cells, cell.getFamily(), cell.getQualifier()));
+ message.setHeader(HBaseAttribute.HBASE_VALUE.asHeader(index++), getValueForColumn(cells, cell.getFamily(), cell.getQualifier()));
}
}
}
@@ -136,10 +136,10 @@ public class HeaderMappingStrategy implements CellMappingStrategy {
for (HBaseRow hRow : data.getRows()) {
Set<HBaseCell> cells = hRow.getCells();
for (HBaseCell cell : cells) {
- message.setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(index), hRow.getId());
- message.setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(index), cell.getFamily());
- message.setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(index), cell.getQualifier());
- message.setHeader(HbaseAttribute.HBASE_VALUE.asHeader(index), cell.getValue());
+ message.setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(index), hRow.getId());
+ message.setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(index), cell.getFamily());
+ message.setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(index), cell.getQualifier());
+ message.setHeader(HBaseAttribute.HBASE_VALUE.asHeader(index), cell.getValue());
}
index++;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java
index 05900a1..92d0198 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java
@@ -16,23 +16,22 @@
*/
package org.apache.camel.component.hbase.processor.idempotent;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.ObjectOutputStream;
import org.apache.camel.component.hbase.HBaseHelper;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.IOHelper;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,15 +41,18 @@ public class HBaseIdempotentRepository extends ServiceSupport implements Idempot
private final String tableName;
private final String family;
- private final String qualifer;
- private final HTable table;
+ private final String qualifier;
+ private final Configuration configuration;
+ private Connection connection;
+ private Table table;
public HBaseIdempotentRepository(Configuration configuration, String tableName, String family, String qualifier) throws IOException {
this.tableName = tableName;
this.family = family;
- this.qualifer = qualifier;
- //In the case of idempotent repository we do not want to catch exceptions related to HTable.
- this.table = new HTable(configuration, tableName);
+ this.qualifier = qualifier;
+ this.configuration = configuration;
+ this.connection = null;
+ this.table = null;
}
@Override
@@ -60,11 +62,10 @@ public class HBaseIdempotentRepository extends ServiceSupport implements Idempot
if (contains(o)) {
return false;
}
- byte[] b = toBytes(o);
+ byte[] b = HBaseHelper.toBytes(o);
Put put = new Put(b);
- put.add(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(qualifer), b);
+ put.addColumn(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(qualifier), b);
table.put(put);
- table.flushCommits();
return true;
}
} catch (Exception e) {
@@ -76,9 +77,9 @@ public class HBaseIdempotentRepository extends ServiceSupport implements Idempot
@Override
public boolean contains(Object o) {
try {
- byte[] b = toBytes(o);
+ byte[] b = HBaseHelper.toBytes(o);
Get get = new Get(b);
- get.addColumn(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(qualifer));
+ get.addColumn(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(qualifier));
return table.exists(get);
} catch (Exception e) {
LOG.warn("Error reading object {} from HBase repository.", o);
@@ -89,7 +90,7 @@ public class HBaseIdempotentRepository extends ServiceSupport implements Idempot
@Override
public boolean remove(Object o) {
try {
- byte[] b = toBytes(o);
+ byte[] b = HBaseHelper.toBytes(o);
if (table.exists(new Get(b))) {
Delete delete = new Delete(b);
table.delete(delete);
@@ -125,43 +126,18 @@ public class HBaseIdempotentRepository extends ServiceSupport implements Idempot
@Override
protected void doStart() throws Exception {
- // noop
+ this.connection = ConnectionFactory.createConnection(configuration);
+ this.table = this.connection.getTable(TableName.valueOf(tableName));
}
@Override
protected void doStop() throws Exception {
- // noop
- }
+ if (table != null) {
+ table.close();
+ }
- private byte[] toBytes(Object obj) {
- if (obj instanceof byte[]) {
- return (byte[]) obj;
- } else if (obj instanceof Byte) {
- return Bytes.toBytes((Byte) obj);
- } else if (obj instanceof Short) {
- return Bytes.toBytes((Short) obj);
- } else if (obj instanceof Integer) {
- return Bytes.toBytes((Integer) obj);
- } else if (obj instanceof Long) {
- return Bytes.toBytes((Long) obj);
- } else if (obj instanceof Double) {
- return Bytes.toBytes((Double) obj);
- } else if (obj instanceof String) {
- return Bytes.toBytes((String) obj);
- } else {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = null;
- try {
- oos = new ObjectOutputStream(baos);
- oos.writeObject(obj);
- return baos.toByteArray();
- } catch (IOException e) {
- LOG.warn("Error while serializing object. Null will be used.", e);
- return null;
- } finally {
- IOHelper.close(oos);
- IOHelper.close(baos);
- }
+ if (connection != null) {
+ connection.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
index 5cc0286..0079ff3 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
@@ -50,15 +50,15 @@ public class CamelHBaseFilterTest extends CamelHBaseTestSupport {
Endpoint endpoint = context.getEndpoint("direct:scan");
Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]);
Exchange resp = template.send(endpoint, exchange);
Message out = resp.getOut();
assertTrue("two first keys returned",
- out.getHeaders().containsValue(body[0][0][0])
- && out.getHeaders().containsValue(body[1][0][0])
- && !out.getHeaders().containsValue(body[2][0][0]));
+ out.getHeaders().containsValue(body[0][0][0])
+ && out.getHeaders().containsValue(body[1][0][0])
+ && !out.getHeaders().containsValue(body[2][0][0]));
}
}
@@ -73,7 +73,6 @@ public class CamelHBaseFilterTest extends CamelHBaseTestSupport {
public void configure() {
from("direct:start")
.to("hbase://" + PERSON_TABLE);
-
from("direct:scan")
.to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2");
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
index 937ef22..343852d 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
@@ -25,8 +25,11 @@ import org.apache.camel.util.IOHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -49,7 +52,6 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport {
protected String[] key = {"1", "2", "3"};
protected final String[] family = {"info", "birthdate", "address"};
- //comlumn[family][column]
protected final String[][] column = {
{"id", "firstName", "lastName"},
{"day", "month", "year"},
@@ -117,7 +119,9 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport {
protected void putMultipleRows() throws IOException {
Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
- HTable table = new HTable(configuration, PERSON_TABLE.getBytes());
+ Connection connection = ConnectionFactory.createConnection(configuration);
+ Table table = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
+
for (int r = 0; r < key.length; r++) {
Put put = new Put(key[r].getBytes());
put.add(family[0].getBytes(), column[0][0].getBytes(), body[r][0][0].getBytes());
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
index deda182..44e2461 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
@@ -31,13 +31,12 @@ public class HBaseConsumerTest extends CamelHBaseTestSupport {
MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
mockEndpoint.expectedMessageCount(3);
- Map<String, Object> headers = new HashMap<String, Object>();
-
+ Map<String, Object> headers = new HashMap<>();
for (int row = 0; row < key.length; row++) {
- headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
- headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
- headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
- headers.put(HbaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]);
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]);
}
headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
@@ -58,7 +57,6 @@ public class HBaseConsumerTest extends CamelHBaseTestSupport {
public void configure() {
from("direct:start")
.to("hbase://" + PERSON_TABLE);
-
from("hbase://" + PERSON_TABLE)
.to("mock:result");
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
index 9813744..3535ad4 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
@@ -41,20 +41,20 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport {
if (systemReady) {
ProducerTemplate template = context.createProducerTemplate();
Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
- headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), INFO_FAMILY);
- headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
- headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), body[0]);
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), INFO_FAMILY);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0]);
- headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
- headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(2), INFO_FAMILY);
- headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]);
- headers.put(HbaseAttribute.HBASE_VALUE.asHeader(2), body[1]);
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(2), INFO_FAMILY);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(2), body[1]);
- headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(3), key[2]);
- headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(3), INFO_FAMILY);
- headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(3), column[0]);
- headers.put(HbaseAttribute.HBASE_VALUE.asHeader(3), body[2]);
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(3), key[2]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(3), INFO_FAMILY);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(3), column[0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(3), body[2]);
headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
@@ -99,7 +99,6 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport {
public void configure() {
from("direct:start")
.to("hbase://" + PERSON_TABLE);
-
from("direct:scan")
.to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&row.family=family1&row.qualifier=column1");
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
index fa3f229..cac4539 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
@@ -26,9 +26,13 @@ import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.util.IOHelper;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.junit.Test;
public class HBaseProducerTest extends CamelHBaseTestSupport {
@@ -36,16 +40,18 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
@Test
public void testPut() throws Exception {
if (systemReady) {
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
- headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
- headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]);
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]);
headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
template.sendBodyAndHeaders("direct:start", null, headers);
Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
- HTable table = new HTable(configuration, PERSON_TABLE.getBytes());
+ Connection connection = ConnectionFactory.createConnection(configuration);
+ Table table = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
+
Get get = new Get(key[0].getBytes());
get.addColumn(family[0].getBytes(), column[0][0].getBytes());
@@ -63,30 +69,30 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
if (systemReady) {
Exchange resp = template.request("direct:start", new Processor() {
public void process(Exchange exchange) throws Exception {
- exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
}
});
- assertEquals(body[0][0][0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader()));
+ assertEquals(body[0][0][0], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
}
}
@Test
public void testPutAndGetWithModel() throws Exception {
if (systemReady) {
- Map<String, Object> headers = new HashMap<String, Object>();
+ Map<String, Object> headers = new HashMap<>();
headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
int index = 1;
for (int row = 0; row < key.length; row++) {
for (int fam = 0; fam < family.length; fam++) {
for (int col = 0; col < column[fam].length; col++) {
- headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(index), key[row]);
- headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(index), family[fam]);
- headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(index), column[fam][col]);
- headers.put(HbaseAttribute.HBASE_VALUE.asHeader(index++), body[row][fam][col]);
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(index), key[row]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(index), family[fam]);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(index), column[fam][col]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(index++), body[row][fam][col]);
}
}
}
@@ -95,26 +101,26 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
Exchange resp = template.request("direct:start-with-model", new Processor() {
public void process(Exchange exchange) throws Exception {
- exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
}
});
- assertEquals(body[0][0][1], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader()));
- assertEquals(body[0][1][2], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2)));
+ assertEquals(body[0][0][1], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
+ assertEquals(body[0][1][2], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2)));
}
}
@Test
public void testPutMultiRows() throws Exception {
if (systemReady) {
- Map<String, Object> headers = new HashMap<String, Object>();
+ Map<String, Object> headers = new HashMap<>();
headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
for (int row = 0; row < key.length; row++) {
- headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
- headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
- headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
- headers.put(HbaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]);
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]);
}
template.sendBodyAndHeaders("direct:start", null, headers);
@@ -143,15 +149,15 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
for (int row = 0; row < key.length; row++) {
- exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
}
}
});
for (int row = 0; row < key.length; row++) {
- assertEquals(body[row][0][0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(row + 1)));
+ assertEquals(body[row][0][0], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(row + 1)));
}
}
}
@@ -159,19 +165,20 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
@Test
public void testPutMultiColumns() throws Exception {
if (systemReady) {
- Map<String, Object> headers = new HashMap<String, Object>();
+ Map<String, Object> headers = new HashMap<>();
headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
for (int col = 0; col < column[0].length; col++) {
- headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]);
- headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]);
- headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]);
- headers.put(HbaseAttribute.HBASE_VALUE.asHeader(col + 1), body[0][col][0]);
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(col + 1), body[0][col][0]);
}
template.sendBodyAndHeaders("direct:start", null, headers);
Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
- HTable bar = new HTable(configuration, PERSON_TABLE.getBytes());
+ Connection connection = ConnectionFactory.createConnection(configuration);
+ Table bar = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
for (int col = 0; col < column[0].length; col++) {
Get get = new Get(key[0].getBytes());
@@ -193,15 +200,15 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
for (int col = 0; col < column[0].length; col++) {
- exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]);
}
}
});
for (int col = 0; col < column[0].length; col++) {
- assertEquals(body[0][col][0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(col + 1)));
+ assertEquals(body[0][col][0], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(col + 1)));
}
}
}
@@ -210,25 +217,25 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
public void testPutAndGetAndDeleteMultiRows() throws Exception {
testPutMultiRows();
if (systemReady) {
- Map<String, Object> headers = new HashMap<String, Object>();
+ Map<String, Object> headers = new HashMap<>();
headers.put(HBaseConstants.OPERATION, HBaseConstants.DELETE);
- headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
template.sendBodyAndHeaders("direct:start", null, headers);
Exchange resp = template.request("direct:start", new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(2), family[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0][0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(2), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0][0]);
}
});
- assertEquals(null, resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader()));
- assertEquals(body[1][0][0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2)));
+ assertEquals(null, resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
+ assertEquals(body[1][0][0], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2)));
}
}
@@ -238,15 +245,15 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
if (systemReady) {
Exchange resp = template.request("direct:maxScan", new Processor() {
public void process(Exchange exchange) throws Exception {
- exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
}
});
- Object result1 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(1));
- Object result2 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2));
+ Object result1 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
+ Object result2 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
// as we use maxResults=2 we only get 2 results back
- Object result3 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(3));
+ Object result3 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
assertNull("Should only get 2 results back", result3);
List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0]);
@@ -260,14 +267,14 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
if (systemReady) {
Exchange resp = template.request("direct:scan", new Processor() {
public void process(Exchange exchange) throws Exception {
- exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
}
});
- Object result1 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(1));
- Object result2 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2));
- Object result3 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(3));
+ Object result1 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
+ Object result2 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
+ Object result3 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]);
assertTrue(bodies.contains(result1) && bodies.contains(result2) && bodies.contains(result3));
@@ -277,12 +284,12 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
@Test
public void testPutAndScan() throws Exception {
if (systemReady) {
- Map<String, Object> headers = new HashMap<String, Object>();
+ Map<String, Object> headers = new HashMap<>();
headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
- headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), "1");
- headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), "info");
- headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), "id");
- headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), "3");
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), "1");
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), "info");
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), "id");
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), "3");
template.sendBodyAndHeaders("direct:start", null, headers);
@@ -298,15 +305,15 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
Exchange resp = template.request("direct:scan", new Processor() {
public void process(Exchange exchange) throws Exception {
- exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), "info");
- exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), "id");
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), "info");
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), "id");
}
});
- assertEquals("1", resp.getOut().getHeader(HbaseAttribute.HBASE_ROW_ID.asHeader()));
- assertEquals("info", resp.getOut().getHeader(HbaseAttribute.HBASE_FAMILY.asHeader()));
- assertEquals("id", resp.getOut().getHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader()));
- assertEquals("3", resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader()));
+ assertEquals("1", resp.getOut().getHeader(HBaseAttribute.HBASE_ROW_ID.asHeader()));
+ assertEquals("info", resp.getOut().getHeader(HBaseAttribute.HBASE_FAMILY.asHeader()));
+ assertEquals("id", resp.getOut().getHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader()));
+ assertEquals("3", resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
}
}
@@ -321,13 +328,10 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
public void configure() {
from("direct:start")
.to("hbase://" + PERSON_TABLE);
-
from("direct:start-with-model")
.to("hbase://" + PERSON_TABLE + "?row.family=info&row.qualifier=firstName&row.family2=birthdate&row.qualifier2=year");
-
from("direct:scan")
- .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN);
-
+ .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN);
from("direct:maxScan")
.to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2");
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
index 1baf1de..cea9a2f 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
@@ -24,7 +24,6 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.client.HTable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -32,7 +31,6 @@ import org.junit.Test;
public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport {
IdempotentRepository<Object> repository;
- HTable table;
private String key01 = "123";
private String key02 = "456";
@@ -46,7 +44,6 @@ public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport {
//Ignore if table exists
}
this.repository = new HBaseIdempotentRepository(hbaseUtil.getConfiguration(), PERSON_TABLE, INFO_FAMILY, "mycolumn");
- table = new HTable(hbaseUtil.getConfiguration(), PERSON_TABLE);
super.setUp();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/resources/log4j.properties b/components/camel-hbase/src/test/resources/log4j.properties
index 306ab49..534faf4 100644
--- a/components/camel-hbase/src/test/resources/log4j.properties
+++ b/components/camel-hbase/src/test/resources/log4j.properties
@@ -25,7 +25,6 @@ log4j.logger.org.apache.camel.component.hbase=WARN, out
#log4j.logger.org.apache.camel=DEBUG
#log4j.logger.org.apache.camel.component.hbase=TRACE
-
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index d09fa6d..b5bcac2 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -224,7 +224,7 @@
<hawtdb-version>1.6</hawtdb-version>
<hawtdispatch-version>1.21</hawtdispatch-version>
<hazelcast-version>3.6</hazelcast-version>
- <hbase-version>1.1.1</hbase-version>
+ <hbase-version>1.1.3</hbase-version>
<hbase-bundle-version>1.1.1_1</hbase-bundle-version>
<hibernate-validator-version>5.2.4.Final</hibernate-validator-version>
<!-- Spring 3.2.x and 4.0.x still stick to JPA 2.0. Hibernate 4.3.x upgraded to JPA 2.1. -->