You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/03/18 08:57:06 UTC
[camel] branch master updated: CAMEL-14576: camel-hbase - Test
using test containers (#3645)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 113d871 CAMEL-14576: camel-hbase - Test using test containers (#3645)
113d871 is described below
commit 113d871871f5344ef20bace48a92c390f7df7305
Author: Kirill Yankov <my...@gmail.com>
AuthorDate: Wed Mar 18 17:56:48 2020 +0900
CAMEL-14576: camel-hbase - Test using test containers (#3645)
---
components/camel-hbase/pom.xml | 237 +-----------
.../component/hbase/CamelHBaseFilterTest.java | 34 +-
.../component/hbase/CamelHBaseTestSupport.java | 115 +++---
.../camel/component/hbase/HBaseConsumerTest.java | 32 +-
.../camel/component/hbase/HBaseContainer.java | 68 ++++
.../component/hbase/HBaseConvertionsTest.java | 101 +++---
.../camel/component/hbase/HBaseProducerTest.java | 398 ++++++++++-----------
.../idempotent/HBaseIdempotentRepositoryTest.java | 134 +++----
8 files changed, 457 insertions(+), 662 deletions(-)
diff --git a/components/camel-hbase/pom.xml b/components/camel-hbase/pom.xml
index 8b7c38e..0c83965 100644
--- a/components/camel-hbase/pom.xml
+++ b/components/camel-hbase/pom.xml
@@ -42,21 +42,12 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-support</artifactId>
</dependency>
-
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase-version}</version>
<exclusions>
<exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </exclusion>
- <exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
@@ -70,137 +61,13 @@
</exclusion>
</exclusions>
</dependency>
- <!-- because hbase-client 1.1.1 use hadoop2.5.1 by default, check is it still required by the next version update -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- <version>${hadoop2-version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- because hbase-client 1.1.1 use hadoop2.5.1 by default, check is it still required by the next version update -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop2-version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop2-version}</version>
- <exclusions>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- <version>${commons-configuration-version}</version>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>${commons-lang-version}</version>
- </dependency>
- <dependency>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
- <version>${commons-net-version}</version>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- <version>${jackson-version}</version>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>${jackson-version}</version>
- </dependency>
-
<!-- testing -->
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-test-junit5</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${hbase-version}</version>
- <classifier>tests</classifier>
+ <artifactId>camel-testcontainers-junit5</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- </exclusion>
- <exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
@@ -208,36 +75,18 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
</exclusions>
</dependency>
- <!-- because hbase-server 1.1.1 use hadoop2.5.1 by default, check is it still required by the next version update -->
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop2-version}</version>
- <classifier>tests</classifier>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>12.0.1</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
</dependency>
- <!-- because hbase-server 1.1.1 use hadoop2.5.1 by default, check is it still required by the next version update -->
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop2-version}</version>
- <classifier>tests</classifier>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.6</version>
<scope>test</scope>
<exclusions>
<exclusion>
@@ -254,22 +103,9 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
<version>${hbase-version}</version>
- <classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
@@ -283,65 +119,6 @@
</exclusion>
</exclusions>
</dependency>
- <!-- because hbase-testing-util 1.1.1 use hadoop2.5.1 by default, check is it still required by the next version update -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop2-version}</version>
- <type>test-jar</type>
- <classifier>tests</classifier>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- because hbase-testing-util 1.1.1 use hadoop2.5.1 by default, check is it still required by the next version update -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <version>${hadoop2-version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>28.2-jre</version>
- <scope>test</scope>
- </dependency>
- <!-- need to use zookeeper 3.4.x for testing -->
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<!-- logging -->
<dependency>
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
index dbc7b36..0a89426 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
@@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class CamelHBaseFilterTest extends CamelHBaseTestSupport {
@BindToRegistry("myFilters")
- public List<Filter> addFilters() throws Exception {
+ public List<Filter> addFilters() {
List<Filter> filters = new LinkedList<>();
filters.add(new ModelAwareColumnMatchingFilter().getFilteredList()); //not used, filters need to be rethink
return filters;
@@ -43,22 +43,20 @@ public class CamelHBaseFilterTest extends CamelHBaseTestSupport {
@Test
public void testPutMultiRowsAndScanWithFilters() throws Exception {
- if (systemReady) {
- putMultipleRows();
- ProducerTemplate template = context.createProducerTemplate();
- Endpoint endpoint = context.getEndpoint("direct:scan");
+ putMultipleRows();
+ ProducerTemplate template = context.createProducerTemplate();
+ Endpoint endpoint = context.getEndpoint("direct:scan");
- Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]);
- Exchange resp = template.send(endpoint, exchange);
- Message out = resp.getMessage();
- assertTrue(out.getHeaders().containsValue(body[0][0][0])
- && out.getHeaders().containsValue(body[1][0][0])
- && !out.getHeaders().containsValue(body[2][0][0]),
- "two first keys returned");
- }
+ Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]);
+ Exchange resp = template.send(endpoint, exchange);
+ Message out = resp.getMessage();
+ assertTrue(out.getHeaders().containsValue(body[0][0][0])
+ && out.getHeaders().containsValue(body[1][0][0])
+ && !out.getHeaders().containsValue(body[2][0][0]),
+ "two first keys returned");
}
/**
@@ -71,9 +69,9 @@ public class CamelHBaseFilterTest extends CamelHBaseTestSupport {
@Override
public void configure() {
from("direct:start")
- .to("hbase://" + PERSON_TABLE);
+ .to("hbase://" + PERSON_TABLE);
from("direct:scan")
- .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2");
+ .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2");
}
};
}
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
index 5ce1624..7a5366a 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
@@ -17,39 +17,31 @@
package org.apache.camel.component.hbase;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
-import org.apache.camel.CamelContext;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.camel.test.testcontainers.junit5.ContainerAwareTestSupport;
import org.apache.camel.util.IOHelper;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
-public abstract class CamelHBaseTestSupport extends CamelTestSupport {
- //The hbase testing utility has special requirements on the umask.
- //We hold this value to check if the minicluster has properly started and tests can be run.
- protected static Boolean systemReady = true;
+public abstract class CamelHBaseTestSupport extends ContainerAwareTestSupport {
- protected static HBaseTestingUtility hbaseUtil = new HBaseTestingUtility();
- protected static int numServers = 1;
protected static final String PERSON_TABLE = "person";
protected static final String INFO_FAMILY = "info";
- private static final Logger LOG = LoggerFactory.getLogger(CamelHBaseTestSupport.class);
-
protected String[] key = {"1", "2", "3"};
protected final String[] family = {"info", "birthdate", "address"};
protected final String[][] column = {
@@ -70,66 +62,81 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport {
family[1].getBytes(),
family[2].getBytes()};
- @BeforeAll
- public static void setUpClass() throws Exception {
- try {
- hbaseUtil.startMiniCluster(numServers);
- } catch (Exception e) {
- LOG.warn("couldn't start HBase cluster. Test is not started, but passed!", e);
- systemReady = false;
- }
- }
-
- @AfterAll
- public static void tearDownClass() throws Exception {
- if (systemReady) {
- hbaseUtil.shutdownMiniCluster();
- }
- }
+ // init container once for a class
+ private GenericContainer cont;
@Override
@BeforeEach
public void setUp() throws Exception {
- if (systemReady) {
- try {
- hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families);
- } catch (TableExistsException ex) {
- //Ignore if table exists
- }
-
- super.setUp();
+ super.setUp();
+ try {
+ createTable(PERSON_TABLE, families);
+ } catch (TableExistsException ex) {
+ //Ignore if table exists
}
}
@Override
@AfterEach
public void tearDown() throws Exception {
- if (systemReady) {
- hbaseUtil.deleteTable(PERSON_TABLE.getBytes());
- super.tearDown();
+ try {
+ deleteTable(PERSON_TABLE);
+ } catch (TableNotFoundException e) {
+ // skip
}
+ super.tearDown();
}
@Override
- public CamelContext createCamelContext() throws Exception {
- CamelContext context = new DefaultCamelContext(createCamelRegistry());
- // configure hbase component
- HBaseComponent component = context.getComponent("hbase", HBaseComponent.class);
- component.setConfiguration(hbaseUtil.getConfiguration());
- return context;
+ protected GenericContainer<?> createContainer() {
+ if (cont == null) {
+ cont = new HBaseContainer();
+ }
+ return cont;
}
- protected void putMultipleRows() throws IOException {
- Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
- Connection connection = ConnectionFactory.createConnection(configuration);
- Table table = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
+ @Override
+ protected long containersStartupTimeout() {
+ // on my laptop it takes around 30-60 seconds to start the cluster.
+ return TimeUnit.MINUTES.toSeconds(5);
+ }
+ protected void putMultipleRows() throws IOException {
+ Table table = connectHBase().getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
for (int r = 0; r < key.length; r++) {
Put put = new Put(key[r].getBytes());
put.addColumn(family[0].getBytes(), column[0][0].getBytes(), body[r][0][0].getBytes());
table.put(put);
}
-
IOHelper.close(table);
}
+
+ protected Configuration getHBaseConfig() {
+ return HBaseContainer.defaultConf();
+ }
+
+ protected Connection connectHBase() throws IOException {
+ Connection connection = ConnectionFactory.createConnection(getHBaseConfig());
+ return connection;
+ }
+
+ protected void createTable(String name, byte[][] families) throws IOException {
+ HTableDescriptor descr = new HTableDescriptor(TableName.valueOf(name));
+ for (byte[] fam : families) {
+ HColumnDescriptor cdescr = new HColumnDescriptor(fam);
+ descr.addFamily(cdescr);
+ }
+ connectHBase().getAdmin().createTable(descr);
+ }
+
+ protected void createTable(String name, String family) throws IOException {
+ createTable(name, new byte[][]{family.getBytes()});
+ }
+
+ protected void deleteTable(String name) throws IOException {
+ Admin admin = connectHBase().getAdmin();
+ TableName tname = TableName.valueOf(name);
+ admin.disableTable(tname);
+ admin.deleteTable(tname);
+ }
}
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
index 5b4cdcc..65ab225 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
@@ -27,23 +27,21 @@ public class HBaseConsumerTest extends CamelHBaseTestSupport {
@Test
public void testPutMultiRowsAndConsume() throws Exception {
- if (systemReady) {
- MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
- mockEndpoint.expectedMessageCount(3);
-
- Map<String, Object> headers = new HashMap<>();
- for (int row = 0; row < key.length; row++) {
- headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
- headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
- headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
- headers.put(HBaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]);
- }
- headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
+ mockEndpoint.expectedMessageCount(3);
+
+ Map<String, Object> headers = new HashMap<>();
+ for (int row = 0; row < key.length; row++) {
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]);
+ }
+ headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
- template.sendBodyAndHeaders("direct:start", null, headers);
+ template.sendBodyAndHeaders("direct:start", null, headers);
- mockEndpoint.assertIsSatisfied();
- }
+ mockEndpoint.assertIsSatisfied();
}
/**
@@ -56,9 +54,9 @@ public class HBaseConsumerTest extends CamelHBaseTestSupport {
@Override
public void configure() {
from("direct:start")
- .to("hbase://" + PERSON_TABLE);
+ .to("hbase://" + PERSON_TABLE);
from("hbase://" + PERSON_TABLE)
- .to("mock:result");
+ .to("mock:result");
}
};
}
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseContainer.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseContainer.java
new file mode 100644
index 0000000..f35a0bb
--- /dev/null
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseContainer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hbase;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.testcontainers.containers.GenericContainer;
+
+/**
+ * Currently there is no official HBase docker image.
+ * This is dummy implementation of the HBase container.
+ */
+public class HBaseContainer extends GenericContainer {
+
+ // must be the same as in the config of camel component
+ private static final Integer CLIENT_PORT = 21818;
+
+ private final HBaseTestingUtility hbaseUtil;
+
+ public HBaseContainer() {
+ hbaseUtil = new HBaseTestingUtility(defaultConf());
+ }
+
+ public HBaseContainer(Configuration conf) {
+ hbaseUtil = new HBaseTestingUtility(conf);
+ }
+
+ @Override
+ public void start() {
+ try {
+ hbaseUtil.startMiniCluster(1);
+ } catch (Exception e) {
+ logger().warn("couldn't start HBase cluster. Test is not started, but passed!", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ hbaseUtil.shutdownMiniCluster();
+ } catch (Exception e) {
+ logger().warn("Error shutting down the HBase container", e);
+ }
+ }
+
+ public static Configuration defaultConf() {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set("test.hbase.zookeeper.property.clientPort", CLIENT_PORT.toString());
+ return conf;
+ }
+
+}
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
index 1154a6e..54142a5 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
@@ -22,10 +22,8 @@ import java.util.Map;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.util.IOHelper;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
@@ -43,55 +41,52 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport {
@Test
public void testPutMultiRows() throws Exception {
- if (systemReady) {
- ProducerTemplate template = context.createProducerTemplate();
- Map<String, Object> headers = new HashMap<>();
- headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
- headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), INFO_FAMILY);
- headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
- headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0]);
-
- headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
- headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(2), INFO_FAMILY);
- headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]);
- headers.put(HBaseAttribute.HBASE_VALUE.asHeader(2), body[1]);
-
- headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(3), key[2]);
- headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(3), INFO_FAMILY);
- headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(3), column[0]);
- headers.put(HBaseAttribute.HBASE_VALUE.asHeader(3), body[2]);
-
- headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
-
- template.sendBodyAndHeaders("direct:start", null, headers);
-
- Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
- Connection conn = ConnectionFactory.createConnection(configuration);
- Table bar = conn.getTable(TableName.valueOf(PERSON_TABLE));
- Get get = new Get(Bytes.toBytes((Integer) key[0]));
-
- //Check row 1
- get.addColumn(INFO_FAMILY.getBytes(), column[0].getBytes());
- Result result = bar.get(get);
- byte[] resultValue = result.value();
- assertArrayEquals(Bytes.toBytes((Long) body[0]), resultValue);
-
- //Check row 2
- get = new Get(Bytes.toBytes((String) key[1]));
- get.addColumn(INFO_FAMILY.getBytes(), column[0].getBytes());
- result = bar.get(get);
- resultValue = result.value();
- assertArrayEquals(Bytes.toBytes((Boolean) body[1]), resultValue);
-
- //Check row 3
- get = new Get(Bytes.toBytes((String) key[2]));
- get.addColumn(INFO_FAMILY.getBytes(), column[0].getBytes());
- result = bar.get(get);
- resultValue = result.value();
- assertArrayEquals(Bytes.toBytes((String) body[2]), resultValue);
-
- IOHelper.close(bar);
- }
+ ProducerTemplate template = context.createProducerTemplate();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), INFO_FAMILY);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0]);
+
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(2), INFO_FAMILY);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(2), body[1]);
+
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(3), key[2]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(3), INFO_FAMILY);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(3), column[0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(3), body[2]);
+
+ headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
+
+ template.sendBodyAndHeaders("direct:start", null, headers);
+
+ Connection conn = connectHBase();
+ Table bar = conn.getTable(TableName.valueOf(PERSON_TABLE));
+ Get get = new Get(Bytes.toBytes((Integer) key[0]));
+
+ //Check row 1
+ get.addColumn(INFO_FAMILY.getBytes(), column[0].getBytes());
+ Result result = bar.get(get);
+ byte[] resultValue = result.value();
+ assertArrayEquals(Bytes.toBytes((Long) body[0]), resultValue);
+
+ //Check row 2
+ get = new Get(Bytes.toBytes((String) key[1]));
+ get.addColumn(INFO_FAMILY.getBytes(), column[0].getBytes());
+ result = bar.get(get);
+ resultValue = result.value();
+ assertArrayEquals(Bytes.toBytes((Boolean) body[1]), resultValue);
+
+ //Check row 3
+ get = new Get(Bytes.toBytes((String) key[2]));
+ get.addColumn(INFO_FAMILY.getBytes(), column[0].getBytes());
+ result = bar.get(get);
+ resultValue = result.value();
+ assertArrayEquals(Bytes.toBytes((String) body[2]), resultValue);
+
+ IOHelper.close(bar);
}
/**
@@ -104,9 +99,9 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport {
@Override
public void configure() {
from("direct:start")
- .to("hbase://" + PERSON_TABLE);
+ .to("hbase://" + PERSON_TABLE);
from("direct:scan")
- .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&row.family=family1&row.qualifier=column1");
+ .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&row.family=family1&row.qualifier=column1");
}
};
}
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
index ac1c92e..f46e490 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
@@ -24,10 +24,8 @@ import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.util.IOHelper;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
@@ -42,290 +40,268 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
@Test
public void testPut() throws Exception {
- if (systemReady) {
- Map<String, Object> headers = new HashMap<>();
- headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
- headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
- headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]);
- headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
- template.sendBodyAndHeaders("direct:start", null, headers);
-
- Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
- Connection connection = ConnectionFactory.createConnection(configuration);
- Table table = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]);
+ headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
+ template.sendBodyAndHeaders("direct:start", null, headers);
- Get get = new Get(key[0].getBytes());
+ Connection connection = connectHBase();
+ Table table = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
- get.addColumn(family[0].getBytes(), column[0][0].getBytes());
- Result result = table.get(get);
- byte[] resultValue = result.value();
- assertArrayEquals(body[0][0][0].getBytes(), resultValue);
+ Get get = new Get(key[0].getBytes());
- IOHelper.close(table);
- }
+ get.addColumn(family[0].getBytes(), column[0][0].getBytes());
+ Result result = table.get(get);
+ byte[] resultValue = result.value();
+ assertArrayEquals(body[0][0][0].getBytes(), resultValue);
+
+ IOHelper.close(table);
}
+
@Test
public void testPutAndGet() throws Exception {
testPut();
- if (systemReady) {
- Exchange resp = template.request("direct:start", exchange -> {
- exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
- exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
- });
-
- assertEquals(body[0][0][0], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
- }
+
+ Exchange resp = template.request("direct:start", exchange -> {
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
+ });
+
+ assertEquals(body[0][0][0], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
}
@Test
public void testPutAndGetWithModel() throws Exception {
- if (systemReady) {
- Map<String, Object> headers = new HashMap<>();
- headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
- int index = 1;
- for (int row = 0; row < key.length; row++) {
- for (int fam = 0; fam < family.length; fam++) {
- for (int col = 0; col < column[fam].length; col++) {
- headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(index), key[row]);
- headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(index), family[fam]);
- headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(index), column[fam][col]);
- headers.put(HBaseAttribute.HBASE_VALUE.asHeader(index++), body[row][fam][col]);
- }
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
+ int index = 1;
+ for (int row = 0; row < key.length; row++) {
+ for (int fam = 0; fam < family.length; fam++) {
+ for (int col = 0; col < column[fam].length; col++) {
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(index), key[row]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(index), family[fam]);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(index), column[fam][col]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(index++), body[row][fam][col]);
}
}
+ }
- template.sendBodyAndHeaders("direct:start", null, headers);
+ template.sendBodyAndHeaders("direct:start", null, headers);
- Exchange resp = template.request("direct:start-with-model", exchange -> {
- exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
- exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
- });
+ Exchange resp = template.request("direct:start-with-model", exchange -> {
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
+ });
- assertEquals(body[0][0][1], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
- assertEquals(body[0][1][2], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2)));
- }
+ assertEquals(body[0][0][1], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
+ assertEquals(body[0][1][2], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2)));
}
@Test
public void testPutMultiRows() throws Exception {
- if (systemReady) {
- Map<String, Object> headers = new HashMap<>();
- headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
- for (int row = 0; row < key.length; row++) {
- headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
- headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
- headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
- headers.put(HBaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]);
- }
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
+ for (int row = 0; row < key.length; row++) {
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]);
+ }
- template.sendBodyAndHeaders("direct:start", null, headers);
+ template.sendBodyAndHeaders("direct:start", null, headers);
- Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
- Connection conn = ConnectionFactory.createConnection(configuration);
- Table bar = conn.getTable(TableName.valueOf(PERSON_TABLE));
+ Connection conn = connectHBase();
+ Table bar = conn.getTable(TableName.valueOf(PERSON_TABLE));
- //Check row 1
- for (int row = 0; row < key.length; row++) {
- Get get = new Get(key[row].getBytes());
- get.addColumn(family[0].getBytes(), column[0][0].getBytes());
- Result result = bar.get(get);
- byte[] resultValue = result.value();
- assertArrayEquals(body[row][0][0].getBytes(), resultValue);
- }
-
- IOHelper.close(bar);
+ //Check row 1
+ for (int row = 0; row < key.length; row++) {
+ Get get = new Get(key[row].getBytes());
+ get.addColumn(family[0].getBytes(), column[0][0].getBytes());
+ Result result = bar.get(get);
+ byte[] resultValue = result.value();
+ assertArrayEquals(body[row][0][0].getBytes(), resultValue);
}
+
+ IOHelper.close(bar);
}
@Test
public void testPutAndGetMultiRows() throws Exception {
testPutMultiRows();
- if (systemReady) {
- Exchange resp = template.request("direct:start", exchange -> {
- exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
- for (int row = 0; row < key.length; row++) {
- exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
- }
- });
+ Exchange resp = template.request("direct:start", exchange -> {
+ exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
for (int row = 0; row < key.length; row++) {
- assertEquals(body[row][0][0], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(row + 1)));
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]);
}
+ });
+
+ for (int row = 0; row < key.length; row++) {
+ assertEquals(body[row][0][0], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(row + 1)));
}
}
@Test
public void testPutMultiColumns() throws Exception {
- if (systemReady) {
- Map<String, Object> headers = new HashMap<>();
- headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
- for (int col = 0; col < column[0].length; col++) {
- headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]);
- headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]);
- headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]);
- headers.put(HBaseAttribute.HBASE_VALUE.asHeader(col + 1), body[0][col][0]);
- }
- template.sendBodyAndHeaders("direct:start", null, headers);
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
+ for (int col = 0; col < column[0].length; col++) {
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]);
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]);
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]);
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(col + 1), body[0][col][0]);
+ }
- Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
- Connection connection = ConnectionFactory.createConnection(configuration);
- Table bar = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
+ template.sendBodyAndHeaders("direct:start", null, headers);
- for (int col = 0; col < column[0].length; col++) {
- Get get = new Get(key[0].getBytes());
- get.addColumn(family[0].getBytes(), column[0][col].getBytes());
- Result result = bar.get(get);
- byte[] resultValue = result.value();
- assertArrayEquals(body[0][col][0].getBytes(), resultValue);
- }
+ Connection connection = connectHBase();
+ Table bar = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
- IOHelper.close(bar);
+ for (int col = 0; col < column[0].length; col++) {
+ Get get = new Get(key[0].getBytes());
+ get.addColumn(family[0].getBytes(), column[0][col].getBytes());
+ Result result = bar.get(get);
+ byte[] resultValue = result.value();
+ assertArrayEquals(body[0][col][0].getBytes(), resultValue);
}
+
+ IOHelper.close(bar);
}
@Test
public void testPutAndGetMultiColumns() throws Exception {
testPutMultiColumns();
- if (systemReady) {
- Exchange resp = template.request("direct:start", exchange -> {
- exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
- for (int col = 0; col < column[0].length; col++) {
- exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]);
- }
- });
+ Exchange resp = template.request("direct:start", exchange -> {
+ exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
for (int col = 0; col < column[0].length; col++) {
- assertEquals(body[0][col][0], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(col + 1)));
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]);
}
+ });
+
+ for (int col = 0; col < column[0].length; col++) {
+ assertEquals(body[0][col][0], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(col + 1)));
}
}
@Test
public void testPutAndGetAndDeleteMultiRows() throws Exception {
testPutMultiRows();
- if (systemReady) {
- Map<String, Object> headers = new HashMap<>();
- headers.put(HBaseConstants.OPERATION, HBaseConstants.DELETE);
- headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
- template.sendBodyAndHeaders("direct:start", null, headers);
-
- Exchange resp = template.request("direct:start", exchange -> {
- exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(2), family[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0][0]);
- });
-
- assertEquals(null, resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
- assertEquals(body[1][0][0], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2)));
- }
+
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(HBaseConstants.OPERATION, HBaseConstants.DELETE);
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ template.sendBodyAndHeaders("direct:start", null, headers);
+
+ Exchange resp = template.request("direct:start", exchange -> {
+ exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(2), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0][0]);
+ });
+
+ assertEquals(null, resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
+ assertEquals(body[1][0][0], resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2)));
}
@Test
public void testPutMultiRowsAndMaxScan() throws Exception {
testPutMultiRows();
- if (systemReady) {
- Exchange resp = template.request("direct:maxScan", exchange -> {
- exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
- });
-
- Object result1 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
- Object result2 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
- // as we use maxResults=2 we only get 2 results back
- Object result3 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
- assertNull(result3, "Should only get 2 results back");
-
- List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0]);
- assertTrue(bodies.contains(result1) && bodies.contains(result2));
- }
+
+ Exchange resp = template.request("direct:maxScan", exchange -> {
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ });
+
+ Object result1 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
+ Object result2 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
+ // as we use maxResults=2 we only get 2 results back
+ Object result3 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
+ assertNull(result3, "Should only get 2 results back");
+
+ List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0]);
+ assertTrue(bodies.contains(result1) && bodies.contains(result2));
}
@Test
public void testPutMultiRowsAndScan() throws Exception {
testPutMultiRows();
- if (systemReady) {
- Exchange resp = template.request("direct:scan", exchange -> {
- exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
- });
-
- Object result1 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
- Object result2 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
- Object result3 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
-
- List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]);
- assertTrue(bodies.contains(result1) && bodies.contains(result2) && bodies.contains(result3));
- }
+ Exchange resp = template.request("direct:scan", exchange -> {
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ });
+
+ Object result1 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
+ Object result2 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
+ Object result3 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
+
+ List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]);
+ assertTrue(bodies.contains(result1) && bodies.contains(result2) && bodies.contains(result3));
}
-
+
@Test
public void testPutMultiRowsAndScanWithStop() throws Exception {
testPutMultiRows();
- if (systemReady) {
- Exchange resp = template.request("direct:scan", exchange -> {
- exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
- exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
- exchange.getIn().setHeader(HBaseConstants.FROM_ROW, key[0]);
- exchange.getIn().setHeader(HBaseConstants.STOP_ROW, key[1]);
- });
-
- Object result1 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
- Object result2 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
- Object result3 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
-
- List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]);
- assertTrue(bodies.contains(result1) && !bodies.contains(result2) && !bodies.contains(result3));
- }
+ Exchange resp = template.request("direct:scan", exchange -> {
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+ exchange.getIn().setHeader(HBaseConstants.FROM_ROW, key[0]);
+ exchange.getIn().setHeader(HBaseConstants.STOP_ROW, key[1]);
+ });
+
+ Object result1 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1));
+ Object result2 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2));
+ Object result3 = resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3));
+
+ List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]);
+ assertTrue(bodies.contains(result1) && !bodies.contains(result2) && !bodies.contains(result3));
}
@Test
public void testPutAndScan() throws Exception {
- if (systemReady) {
- Map<String, Object> headers = new HashMap<>();
- headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
- headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), "1");
- headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), "info");
- headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), "id");
- headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), "3");
-
- template.sendBodyAndHeaders("direct:start", null, headers);
-
- Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
-
- Connection conn = ConnectionFactory.createConnection(configuration);
- Table bar = conn.getTable(TableName.valueOf(PERSON_TABLE));
-
- Get get = new Get("1".getBytes());
- get.addColumn("info".getBytes(), "id".getBytes());
- Result result = bar.get(get);
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
+ headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), "1");
+ headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), "info");
+ headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), "id");
+ headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), "3");
- assertArrayEquals("3".getBytes(), result.value());
+ template.sendBodyAndHeaders("direct:start", null, headers);
- IOHelper.close(bar);
+ Connection conn = connectHBase();
+ Table bar = conn.getTable(TableName.valueOf(PERSON_TABLE));
- Exchange resp = template.request("direct:scan", exchange -> {
- exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), "info");
- exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), "id");
- });
+ Get get = new Get("1".getBytes());
+ get.addColumn("info".getBytes(), "id".getBytes());
+ Result result = bar.get(get);
- assertEquals("1", resp.getMessage().getHeader(HBaseAttribute.HBASE_ROW_ID.asHeader()));
- assertEquals("info", resp.getMessage().getHeader(HBaseAttribute.HBASE_FAMILY.asHeader()));
- assertEquals("id", resp.getMessage().getHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader()));
- assertEquals("3", resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
- }
+ assertArrayEquals("3".getBytes(), result.value());
+
+ IOHelper.close(bar);
+
+ Exchange resp = template.request("direct:scan", exchange -> {
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), "info");
+ exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), "id");
+ });
+
+ assertEquals("1", resp.getMessage().getHeader(HBaseAttribute.HBASE_ROW_ID.asHeader()));
+ assertEquals("info", resp.getMessage().getHeader(HBaseAttribute.HBASE_FAMILY.asHeader()));
+ assertEquals("id", resp.getMessage().getHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader()));
+ assertEquals("3", resp.getMessage().getHeader(HBaseAttribute.HBASE_VALUE.asHeader()));
}
/**
@@ -338,13 +314,13 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
@Override
public void configure() {
from("direct:start")
- .to("hbase://" + PERSON_TABLE);
+ .to("hbase://" + PERSON_TABLE);
from("direct:start-with-model")
- .to("hbase://" + PERSON_TABLE + "?row.family=info&row.qualifier=firstName&row.family2=birthdate&row.qualifier2=year");
+ .to("hbase://" + PERSON_TABLE + "?row.family=info&row.qualifier=firstName&row.family2=birthdate&row.qualifier2=year");
from("direct:scan")
- .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN);
+ .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN);
from("direct:maxScan")
- .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2");
+ .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2");
}
};
}
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
index cd1797e..e8e3304 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
@@ -16,12 +16,9 @@
*/
package org.apache.camel.component.hbase.processor.idempotent;
-import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.hbase.CamelHBaseTestSupport;
-import org.apache.camel.component.hbase.HBaseHelper;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.hadoop.hbase.TableExistsException;
import org.junit.jupiter.api.AfterEach;
@@ -41,114 +38,93 @@ public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport {
@Override
@BeforeEach
public void setUp() throws Exception {
- if (systemReady) {
- try {
- hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), HBaseHelper.getHBaseFieldAsBytes(INFO_FAMILY));
- } catch (TableExistsException ex) {
- //Ignore if table exists
- }
- this.repository = new HBaseIdempotentRepository(hbaseUtil.getConfiguration(), PERSON_TABLE, INFO_FAMILY, "mycolumn");
- super.setUp();
+ this.repository = new HBaseIdempotentRepository(getHBaseConfig(), PERSON_TABLE, INFO_FAMILY, "mycolumn");
+ super.setUp();
+ try {
+ createTable(PERSON_TABLE, INFO_FAMILY);
+ } catch (TableExistsException ex) {
+ //Ignore if table exists
}
}
@Override
@AfterEach
public void tearDown() throws Exception {
- if (systemReady) {
- hbaseUtil.deleteTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE));
- super.setUp();
- }
+ deleteTable(PERSON_TABLE);
+ super.tearDown();
}
@Test
public void testAdd() throws Exception {
- if (systemReady) {
- // add first key
- assertTrue(repository.add(key01));
- assertTrue(repository.contains(key01));
+ // add first key
+ assertTrue(repository.add(key01));
+ assertTrue(repository.contains(key01));
- // try to add an other one
- assertTrue(repository.add(key02));
- assertTrue(repository.contains(key02));
+ // try to add an other one
+ assertTrue(repository.add(key02));
+ assertTrue(repository.contains(key02));
- // try to add the first key again
- assertFalse(repository.add(key01));
- }
+ // try to add the first key again
+ assertFalse(repository.add(key01));
}
@Test
public void testContains() throws Exception {
- if (systemReady) {
- assertFalse(repository.contains(key01));
+ assertFalse(repository.contains(key01));
- // add key and check again
- assertTrue(repository.add(key01));
- assertTrue(repository.contains(key01));
- }
+ // add key and check again
+ assertTrue(repository.add(key01));
+ assertTrue(repository.contains(key01));
}
@Test
public void testRemove() throws Exception {
- if (systemReady) {
- // add key to remove
- assertTrue(repository.add(key01));
- assertTrue(repository.contains(key01));
- // assertEquals(1, dataSet.size());
-
- // remove key
- assertTrue(repository.remove(key01));
- //assertEquals(0, dataSet.size());
-
- // try to remove a key that isn't there
- assertFalse(repository.remove(key02));
- }
+ // add key to remove
+ assertTrue(repository.add(key01));
+ assertTrue(repository.contains(key01));
+ // assertEquals(1, dataSet.size());
+
+ // remove key
+ assertTrue(repository.remove(key01));
+ //assertEquals(0, dataSet.size());
+
+ // try to remove a key that isn't there
+ assertFalse(repository.remove(key02));
}
-
+
@Test
public void testClear() throws Exception {
- if (systemReady) {
- // add key to remove
- assertTrue(repository.add(key01));
- assertTrue(repository.add(key02));
- assertTrue(repository.contains(key01));
- assertTrue(repository.contains(key02));
-
- // remove key
- repository.clear();
-
- assertFalse(repository.contains(key01));
- assertFalse(repository.contains(key02));
- }
+ // add key to remove
+ assertTrue(repository.add(key01));
+ assertTrue(repository.add(key02));
+ assertTrue(repository.contains(key01));
+ assertTrue(repository.contains(key02));
+
+ // remove key
+ repository.clear();
+
+ assertFalse(repository.contains(key01));
+ assertFalse(repository.contains(key02));
}
@Test
public void testConfirm() throws Exception {
- if (systemReady) {
- // it always return true
- assertTrue(repository.confirm(key01));
- }
+ // it always return true
+ assertTrue(repository.confirm(key01));
}
@Test
public void testRepositoryInRoute() throws Exception {
- if (systemReady) {
- MockEndpoint mock = (MockEndpoint) context.getEndpoint("mock:out");
- mock.expectedBodiesReceived("a", "b");
- // c is a duplicate
+ MockEndpoint mock = (MockEndpoint) context.getEndpoint("mock:out");
+ mock.expectedBodiesReceived("a", "b");
+ // c is a duplicate
- // send 3 message with one duplicated key (key01)
- template.sendBodyAndHeader("direct:in", "a", "messageId", key01);
- template.sendBodyAndHeader("direct:in", "b", "messageId", key02);
- template.sendBodyAndHeader("direct:in", "c", "messageId", key01);
+ // send 3 message with one duplicated key (key01)
+ template.sendBodyAndHeader("direct:in", "a", "messageId", key01);
+ template.sendBodyAndHeader("direct:in", "b", "messageId", key02);
+ template.sendBodyAndHeader("direct:in", "c", "messageId", key01);
- assertMockEndpointsSatisfied();
- }
- }
-
- @Override
- public CamelContext createCamelContext() throws Exception {
- return new DefaultCamelContext(createCamelRegistry());
+ assertMockEndpointsSatisfied();
}
@Override
@@ -157,8 +133,8 @@ public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport {
@Override
public void configure() throws Exception {
from("direct:in")
- .idempotentConsumer(header("messageId"), repository)
- .to("mock:out");
+ .idempotentConsumer(header("messageId"), repository)
+ .to("mock:out");
}
};
}