You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/17 06:04:15 UTC
[flink] branch release-1.11 updated: [FLINK-18072][hbase] Fix
HBaseLookupFunction can not work with new internal data structure RowData
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new b2711c5 [FLINK-18072][hbase] Fix HBaseLookupFunction can not work with new internal data structure RowData
b2711c5 is described below
commit b2711c5d7d197735e2cbedf4e675273ef7e9a3bb
Author: Leonard Xu <xb...@163.com>
AuthorDate: Wed Jun 17 14:01:36 2020 +0800
[FLINK-18072][hbase] Fix HBaseLookupFunction can not work with new internal data structure RowData
This closes #12594
---
.../hbase/source/HBaseDynamicTableSource.java | 2 +-
.../hbase/source/HBaseLookupFunction.java | 2 +-
...nction.java => HBaseRowDataLookupFunction.java} | 51 +--
.../flink/connector/hbase/util/HBaseSerde.java | 26 ++
.../connector/hbase/HBaseConnectorITCase.java | 423 ++++++++-------------
.../hbase/HBaseDynamicTableFactoryTest.java | 6 +-
.../flink/connector/hbase/util/HBaseTestBase.java | 68 +++-
7 files changed, 280 insertions(+), 298 deletions(-)
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
index 1dac67ae..537d4b2 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
@@ -77,7 +77,7 @@ public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSour
.isPresent(),
"Currently, HBase table only supports lookup by rowkey field.");
- return TableFunctionProvider.of(new HBaseLookupFunction(conf, tableName, hbaseSchema));
+ return TableFunctionProvider.of(new HBaseRowDataLookupFunction(conf, tableName, hbaseSchema, nullStringLiteral));
}
@Override
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
index 1d608e9..c84cbe1 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
@@ -45,7 +45,7 @@ import java.io.IOException;
/**
* The HBaseLookupFunction is a standard user-defined table function, it can be used in tableAPI
- * and also useful for temporal table join plan in SQL.
+ * and also useful for temporal table join plan in SQL. It looks up the result as {@link Row}.
*/
@Internal
public class HBaseLookupFunction extends TableFunction<Row> {
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
similarity index 78%
copy from flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
copy to flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
index 1d608e9..6a36fa3 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
@@ -20,22 +20,21 @@ package org.apache.flink.connector.hbase.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
-import org.apache.flink.connector.hbase.util.HBaseReadWriteHelper;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.slf4j.Logger;
@@ -44,29 +43,33 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
- * The HBaseLookupFunction is a standard user-defined table function, it can be used in tableAPI
- * and also useful for temporal table join plan in SQL.
+ * The HBaseRowDataLookupFunction is a standard user-defined table function, it can be used in tableAPI
+ * and also useful for temporal table join plan in SQL. It looks up the result as {@link RowData}.
*/
@Internal
-public class HBaseLookupFunction extends TableFunction<Row> {
- private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupFunction.class);
+public class HBaseRowDataLookupFunction extends TableFunction<RowData> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataLookupFunction.class);
private static final long serialVersionUID = 1L;
private final String hTableName;
private final byte[] serializedConfig;
private final HBaseTableSchema hbaseTableSchema;
+ private final String nullStringLiteral;
- private transient HBaseReadWriteHelper readHelper;
private transient Connection hConnection;
private transient HTable table;
+ private transient HBaseSerde serde;
- public HBaseLookupFunction(
+ public HBaseRowDataLookupFunction(
Configuration configuration,
String hTableName,
- HBaseTableSchema hbaseTableSchema) {
+ HBaseTableSchema hbaseTableSchema,
+ String nullStringLiteral) {
this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
this.hTableName = hTableName;
this.hbaseTableSchema = hbaseTableSchema;
+ this.nullStringLiteral = nullStringLiteral;
}
/**
@@ -75,25 +78,23 @@ public class HBaseLookupFunction extends TableFunction<Row> {
*/
public void eval(Object rowKey) throws IOException {
// fetch result
- Result result = table.get(readHelper.createGet(rowKey));
- if (!result.isEmpty()) {
- // parse and collect
- collect(readHelper.parseToRow(result, rowKey));
+ Get get = serde.createGet(rowKey);
+ if (get != null) {
+ Result result = table.get(get);
+ if (!result.isEmpty()) {
+ // parse and collect
+ collect(serde.convertToRow(result));
+ }
}
}
- @Override
- public TypeInformation<Row> getResultType() {
- return hbaseTableSchema.convertsToTableSchema().toRowType();
- }
-
- private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() {
+ private Configuration prepareRuntimeConfiguration() {
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
// and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
// user params from client-side have the highest priority
- org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
+ Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
serializedConfig,
- HBaseConfiguration.create());
+ HBaseConfigurationUtil.getHBaseConfiguration());
// do validation: check key option(s) in final runtime configuration
if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
@@ -107,7 +108,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
@Override
public void open(FunctionContext context) {
LOG.info("start open ...");
- org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
+ Configuration config = prepareRuntimeConfiguration();
try {
hConnection = ConnectionFactory.createConnection(config);
table = (HTable) hConnection.getTable(TableName.valueOf(hTableName));
@@ -118,7 +119,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
- this.readHelper = new HBaseReadWriteHelper(hbaseTableSchema);
+ this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
LOG.info("end open.");
}
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index 693cb62..f888914 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -76,6 +77,7 @@ public class HBaseSerde {
private final @Nullable FieldDecoder keyDecoder;
private final FieldEncoder[][] qualifierEncoders;
private final FieldDecoder[][] qualifierDecoders;
+ private final GenericRowData rowWithRowKey;
public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) {
this.families = hbaseSchema.getFamilyKeys();
@@ -115,6 +117,7 @@ public class HBaseSerde {
.toArray(FieldDecoder[]::new);
this.reusedFamilyRows[f] = new GenericRowData(dataTypes.length);
}
+ this.rowWithRowKey = new GenericRowData(1);
}
/**
@@ -196,6 +199,29 @@ public class HBaseSerde {
}
/**
+ * Returns an instance of Get that retrieves the matches records from the HBase table.
+ *
+ * @return The appropriate instance of Get for this use case.
+ */
+ public Get createGet(Object rowKey) {
+ checkArgument(keyEncoder != null, "row key is not set.");
+ rowWithRowKey.setField(0, rowKey);
+ byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0);
+ if (rowkey.length == 0) {
+ // drop dirty records, rowkey shouldn't be zero length
+ return null;
+ }
+ Get get = new Get(rowkey);
+ for (int f = 0; f < families.length; f++) {
+ byte[] family = families[f];
+ for (byte[] qualifier : qualifiers[f]) {
+ get.addColumn(family, qualifier);
+ }
+ }
+ return get;
+ }
+
+ /**
* Converts HBase {@link Result} into {@link RowData}.
*/
public RowData convertToRow(Result result) {
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
index 3777dec..9ab36cc 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.hbase.source.HBaseInputFormat;
import org.apache.flink.connector.hbase.source.HBaseTableSource;
-import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase.util.HBaseTestBase;
import org.apache.flink.connector.hbase.util.PlannerType;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -40,20 +39,17 @@ import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableImpl;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil;
import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
import org.apache.flink.table.planner.sinks.CollectRowTableSink;
import org.apache.flink.table.planner.sinks.CollectTableSink;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.runtime.utils.StreamITCase;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.TableSource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@@ -61,26 +57,15 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import java.util.stream.Collectors;
import scala.Option;
import static org.apache.flink.connector.hbase.util.PlannerType.OLD_PLANNER;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.junit.Assert.assertEquals;
/**
@@ -124,7 +109,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
- hbaseTable.setRowKey("rowkey", Integer.class);
+ hbaseTable.setRowKey(ROW_KEY, Integer.class);
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable);
} else {
tEnv.executeSql(
@@ -176,7 +161,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
- hbaseTable.setRowKey("rowkey", Integer.class);
+ hbaseTable.setRowKey(ROW_KEY, Integer.class);
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable);
} else {
tEnv.executeSql(
@@ -221,7 +206,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
if (isLegacyConnector) {
HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
// shuffle order of column registration
- hbaseTable.setRowKey("rowkey", Integer.class);
+ hbaseTable.setRowKey(ROW_KEY, Integer.class);
hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
@@ -268,7 +253,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class);
hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class);
- hbaseTable.setRowKey("rowkey", Integer.class);
+ hbaseTable.setRowKey(ROW_KEY, Integer.class);
((TableEnvironmentInternal) tEnv).registerTableSourceInternal("hTable", hbaseTable);
} else {
tEnv.executeSql(
@@ -319,115 +304,33 @@ public class HBaseConnectorITCase extends HBaseTestBase {
assertEquals(360, (int) resultSet.get(0).f0);
}
- // -------------------------------------------------------------------------------------
- // HBaseTableSink tests
- // -------------------------------------------------------------------------------------
-
- // prepare a source collection.
- private static final List<Row> testData1 = new ArrayList<>();
- private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
- new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE,
- Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.SQL_TIME},
- new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3", "f4c1", "f4c2", "f4c3"});
-
- static {
- testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
- Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:00:00")));
- testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
- Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00")));
- testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
- Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00")));
- testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4",
- Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00")));
- testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
- Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00")));
- testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
- Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00")));
- testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
- Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00")));
- testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8",
- Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00")));
- }
-
@Test
public void testTableSink() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
- if (isLegacyConnector) {
- HBaseTableSchema schema = new HBaseTableSchema();
- schema.addColumn(FAMILY1, F1COL1, Integer.class);
- schema.addColumn(FAMILY2, F2COL1, String.class);
- schema.addColumn(FAMILY2, F2COL2, Long.class);
- schema.setRowKey("rk", Integer.class);
- schema.addColumn(FAMILY3, F3COL1, Double.class);
- schema.addColumn(FAMILY3, F3COL2, Boolean.class);
- schema.addColumn(FAMILY3, F3COL3, String.class);
-
- Map<String, String> tableProperties = new HashMap<>();
- tableProperties.put("connector.type", "hbase");
- tableProperties.put("connector.version", "1.4.3");
- tableProperties.put("connector.property-version", "1");
- tableProperties.put("connector.table-name", TEST_TABLE_2);
- tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum());
- tableProperties.put("connector.zookeeper.znode.parent", "/hbase");
- DescriptorProperties descriptorProperties = new DescriptorProperties(true);
- descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema());
- descriptorProperties.putProperties(tableProperties);
- TableSink tableSink = TableFactoryService
- .find(HBaseTableFactory.class, descriptorProperties.asMap())
- .createTableSink(descriptorProperties.asMap());
- ((TableEnvironmentInternal) tEnv).registerTableSinkInternal("hbase", tableSink);
- } else {
- tEnv.executeSql(
- "CREATE TABLE hbase (" +
- " family1 ROW<col1 INT>," +
- " family2 ROW<col1 STRING, col2 BIGINT>," +
- " rk INT," +
- " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>" +
- ") WITH (" +
- " 'connector' = 'hbase-1.4'," +
- " 'table-name' = '" + TEST_TABLE_1 + "'," +
- " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'," +
- " 'zookeeper.znode.parent' = '/hbase'" +
- ")");
- }
+ // register HBase table testTable1 which contains test data
+ String table1DDL = createHBaseTableDDL(TEST_TABLE_1, false);
+ tEnv.executeSql(table1DDL);
+
+ String table2DDL = createHBaseTableDDL(TEST_TABLE_2, false);
+ tEnv.executeSql(table2DDL);
- DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
- tEnv.createTemporaryView("src", ds);
+ String query = "INSERT INTO " + TEST_TABLE_2 + " SELECT" +
+ " rowkey," +
+ " family1," +
+ " family2," +
+ " family3" +
+ " FROM " + TEST_TABLE_1;
- String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
+ // wait to finish
TableEnvUtil.execInsertSqlAndWaitResult(tEnv, query);
// start a batch scan job to verify contents in HBase table
- // start a batch scan job to verify contents in HBase table
- TableEnvironment batchTableEnv = createBatchTableEnv();
-
- if (isLegacyConnector) {
- HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_2);
- hbaseTable.setRowKey("rowkey", Integer.class);
- hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
- hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
- hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
- hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
- hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
- hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
- ((TableEnvironmentInternal) batchTableEnv).registerTableSourceInternal("hTable", hbaseTable);
- } else {
- batchTableEnv.executeSql(
- "CREATE TABLE hTable (" +
- " rowkey INT," +
- " family1 ROW<col1 INT>," +
- " family2 ROW<col1 STRING, col2 BIGINT>," +
- " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>" +
- ") WITH (" +
- " 'connector' = 'hbase-1.4'," +
- " 'table-name' = '" + TEST_TABLE_1 + "'," +
- " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" +
- ")");
- }
+ TableEnvironment batchEnv = createBatchTableEnv();
+ batchEnv.executeSql(table2DDL);
- Table table = batchTableEnv.sqlQuery(
+ Table table = batchEnv.sqlQuery(
"SELECT " +
" h.rowkey, " +
" h.family1.col1, " +
@@ -436,9 +339,8 @@ public class HBaseConnectorITCase extends HBaseTestBase {
" h.family3.col1, " +
" h.family3.col2, " +
" h.family3.col3 " +
- "FROM hTable AS h"
+ "FROM " + TEST_TABLE_2 + " AS h"
);
-
List<Row> results = collectBatchResult(table);
String expected =
"1,10,Hello-1,100,1.01,false,Welt-1\n" +
@@ -455,56 +357,37 @@ public class HBaseConnectorITCase extends HBaseTestBase {
@Test
public void testTableSourceSinkWithDDL() throws Exception {
+ if (OLD_PLANNER.equals(planner) || isLegacyConnector) {
+ // only test for blink planner and new connector, because types TIMESTAMP/DATE/TIME/DECIMAL works well in
+ // new connector(using blink-planner), but exits some precision problem in old planner or legacy connector.
+ return;
+ }
+
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
- DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
- tEnv.createTemporaryView("src", ds);
+ // regiter HBase table testTable1 which contains test data
+ String table1DDL = createHBaseTableDDL(TEST_TABLE_1, true);
+ tEnv.executeSql(table1DDL);
- // register hbase table
- String quorum = getZookeeperQuorum();
- String ddl;
- if (isLegacyConnector) {
- ddl = "CREATE TABLE hbase (\n" +
- " rowkey INT," +
- " family1 ROW<col1 INT>,\n" +
- " family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
- " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>,\n" +
- " family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3)>\n" +
- ") WITH (\n" +
- " 'connector.type' = 'hbase',\n" +
- " 'connector.version' = '1.4.3',\n" +
- " 'connector.table-name' = 'testTable3',\n" +
- " 'connector.zookeeper.quorum' = '" + quorum + "',\n" +
- " 'connector.zookeeper.znode.parent' = '/hbase' " +
- ")";
- } else {
- ddl = "CREATE TABLE hbase (\n" +
- " rowkey INT," +
- " family1 ROW<col1 INT>,\n" +
- " family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
- " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>,\n" +
- " family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3)>\n" +
- ") WITH (\n" +
- " 'connector' = 'hbase-1.4',\n" +
- " 'table-name' = 'testTable3',\n" +
- " 'zookeeper.quorum' = '" + quorum + "',\n" +
- " 'zookeeper.znode.parent' = '/hbase' " +
- ")";
- }
- tEnv.executeSql(ddl);
+ // register HBase table which is empty
+ String table3DDL = createHBaseTableDDL(TEST_TABLE_3, true);
+ tEnv.executeSql(table3DDL);
- String query = "INSERT INTO hbase " +
- "SELECT rowkey, ROW(f1c1), ROW(f2c1, f2c2), ROW(f3c1, f3c2, f3c3), ROW(f4c1, f4c2, f4c3) " +
- "FROM src";
- TableEnvUtil.execInsertSqlAndWaitResult(tEnv, query);
+ String insertStatement = "INSERT INTO " + TEST_TABLE_3 +
+ " SELECT rowkey," +
+ " family1," +
+ " family2," +
+ " family3," +
+ " family4" +
+ " from " + TEST_TABLE_1;
+ // wait to finish
+ TableEnvUtil.execInsertSqlAndWaitResult(tEnv, insertStatement);
// start a batch scan job to verify contents in HBase table
- TableEnvironment batchTableEnv = createBatchTableEnv();
- batchTableEnv.executeSql(ddl);
-
- Table table = batchTableEnv.sqlQuery(
- "SELECT " +
+ TableEnvironment batchEnv = createBatchTableEnv();
+ batchEnv.executeSql(table3DDL);
+ String query = "SELECT " +
" h.rowkey, " +
" h.family1.col1, " +
" h.family2.col1, " +
@@ -514,115 +397,103 @@ public class HBaseConnectorITCase extends HBaseTestBase {
" h.family3.col3, " +
" h.family4.col1, " +
" h.family4.col2, " +
- " h.family4.col3 " +
- "FROM hbase AS h"
- );
-
- List<Row> results = collectBatchResult(table);
- String expected =
- "1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18 19:00:00.0,2019-08-18,19:00:00\n" +
- "2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18 19:01:00.0,2019-08-18,19:01:00\n" +
- "3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18 19:02:00.0,2019-08-18,19:02:00\n" +
- "4,40,null,400,4.04,true,Welt-4,2019-08-18 19:03:00.0,2019-08-18,19:03:00\n" +
- "5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19 19:10:00.0,2019-08-19,19:10:00\n" +
- "6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19 19:20:00.0,2019-08-19,19:20:00\n" +
- "7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19 19:30:00.0,2019-08-19,19:30:00\n" +
- "8,80,null,800,8.08,true,Welt-8,2019-08-19 19:40:00.0,2019-08-19,19:40:00\n";
-
- TestBaseUtils.compareResultAsText(results, expected);
- }
-
+ " h.family4.col3, " +
+ " h.family4.col4 " +
+ " FROM " + TEST_TABLE_3 + " AS h";
+ Iterator<Row> collected = tEnv.executeSql(query).collect();
+ List<String> result = Lists.newArrayList(collected).stream()
+ .map(Row::toString)
+ .sorted()
+ .collect(Collectors.toList());
- // -------------------------------------------------------------------------------------
- // HBase lookup source tests
- // -------------------------------------------------------------------------------------
-
- // prepare a source collection.
- private static final List<Row> testData2 = new ArrayList<>();
- private static final RowTypeInfo testTypeInfo2 = new RowTypeInfo(
- new TypeInformation[]{Types.INT, Types.LONG, Types.STRING},
- new String[]{"a", "b", "c"});
-
- static {
- testData2.add(Row.of(1, 1L, "Hi"));
- testData2.add(Row.of(2, 2L, "Hello"));
- testData2.add(Row.of(3, 2L, "Hello world"));
- testData2.add(Row.of(3, 3L, "Hello world!"));
+ List<String> expected = new ArrayList<>();
+ expected.add("1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18T19:00,2019-08-18,19:00,12345678.0001");
+ expected.add("2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18T19:01,2019-08-18,19:01,12345678.0002");
+ expected.add("3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
+ expected.add("4,40,null,400,4.04,true,Welt-4,2019-08-18T19:03,2019-08-18,19:03,12345678.0004");
+ expected.add("5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19T19:10,2019-08-19,19:10,12345678.0005");
+ expected.add("6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19T19:20,2019-08-19,19:20,12345678.0006");
+ expected.add("7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19T19:30,2019-08-19,19:30,12345678.0007");
+ expected.add("8,80,null,800,8.08,true,Welt-8,2019-08-19T19:40,2019-08-19,19:40,12345678.0008");
+ assertEquals(expected, result);
}
@Test
public void testHBaseLookupTableSource() throws Exception {
- if (OLD_PLANNER.equals(planner)) {
+ if (OLD_PLANNER.equals(planner) || isLegacyConnector) {
// lookup table source is only supported in blink planner, skip for old planner
+ // types TIMESTAMP/DATE/TIME/DECIMAL works well in new connector, skip legacy connector
return;
}
- StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
- StreamITCase.clear();
+
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+ tEnv.executeSql(
+ "CREATE TABLE " + TEST_TABLE_1 + " (" +
+ " family1 ROW<col1 INT>," +
+ " family2 ROW<col1 STRING, col2 BIGINT>," +
+ " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>," +
+ " rowkey INT," +
+ " family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3), col4 DECIMAL(12, 4)>," +
+ " PRIMARY KEY (rowkey) NOT ENFORCED" +
+ ") WITH (" +
+ " 'connector' = 'hbase-1.4'," +
+ " 'table-name' = '" + TEST_TABLE_1 + "'," +
+ " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" +
+ ")");
// prepare a source table
String srcTableName = "src";
- DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2);
- Table in = streamTableEnv.fromDataStream(ds, $("a"), $("b"), $("c"), $("proc").proctime());
- streamTableEnv.registerTable(srcTableName, in);
+ DataStream<Row> srcDs = execEnv.fromCollection(testData).returns(testTypeInfo);
+ Table in = tEnv.fromDataStream(srcDs, $("a"), $("b"), $("c"), $("proc").proctime());
+ tEnv.registerTable(srcTableName, in);
- if (isLegacyConnector) {
- Map<String, String> tableProperties = hbaseTableProperties();
- TableSource<?> source = TableFactoryService
- .find(HBaseTableFactory.class, tableProperties)
- .createTableSource(tableProperties);
- ((TableEnvironmentInternal) streamTableEnv).registerTableSourceInternal("hbaseLookup", source);
- } else {
- streamTableEnv.executeSql(
- "CREATE TABLE hbaseLookup (" +
- " family1 ROW<col1 INT>," +
- " rk INT," +
- " family2 ROW<col1 STRING, col2 BIGINT>," +
- " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>" +
- ") WITH (" +
- " 'connector' = 'hbase-1.4'," +
- " 'table-name' = '" + TEST_TABLE_1 + "'," +
- " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" +
- ")");
- }
// perform a temporal table join query
- String query = "SELECT a,family1.col1, family3.col3 FROM src " +
- "JOIN hbaseLookup FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rk";
- Table result = streamTableEnv.sqlQuery(query);
-
- DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink<>());
-
- streamEnv.execute();
+ String dimJoinQuery = "SELECT" +
+ " a," +
+ " b," +
+ " h.family1.col1," +
+ " h.family2.col1," +
+ " h.family2.col2," +
+ " h.family3.col1," +
+ " h.family3.col2," +
+ " h.family3.col3," +
+ " h.family4.col1," +
+ " h.family4.col2," +
+ " h.family4.col3," +
+ " h.family4.col4 " +
+ " FROM src JOIN " + TEST_TABLE_1 + " FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rowkey";
+ Iterator<Row> collected = tEnv.executeSql(dimJoinQuery).collect();
+ List<String> result = Lists.newArrayList(collected).stream()
+ .map(Row::toString)
+ .sorted()
+ .collect(Collectors.toList());
List<String> expected = new ArrayList<>();
- expected.add("1,10,Welt-1");
- expected.add("2,20,Welt-2");
- expected.add("3,30,Welt-3");
- expected.add("3,30,Welt-3");
+ expected.add("1,1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18T19:00,2019-08-18,19:00,12345678.0001");
+ expected.add("2,2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18T19:01,2019-08-18,19:01,12345678.0002");
+ expected.add("3,2,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
+ expected.add("3,3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
- StreamITCase.compareWithList(expected);
+ assertEquals(expected, result);
}
- private static Map<String, String> hbaseTableProperties() {
- Map<String, String> properties = new HashMap<>();
- properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
- properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
- properties.put(CONNECTOR_PROPERTY_VERSION, "1");
- properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
- properties.put(CONNECTOR_ZK_QUORUM, getZookeeperQuorum());
- // schema
- String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
- TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
- TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG);
- TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
- TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3};
-
- DescriptorProperties descriptorProperties = new DescriptorProperties(true);
- TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
- descriptorProperties.putTableSchema(SCHEMA, tableSchema);
- descriptorProperties.putProperties(properties);
- return descriptorProperties.asMap();
+ // -------------------------------------------------------------------------------------
+ // HBase lookup source tests
+ // -------------------------------------------------------------------------------------
+
+ // prepare a source collection.
+ private static final List<Row> testData = new ArrayList<>();
+ private static final RowTypeInfo testTypeInfo = new RowTypeInfo(
+ new TypeInformation[]{Types.INT, Types.LONG, Types.STRING},
+ new String[]{"a", "b", "c"});
+
+ static {
+ testData.add(Row.of(1, 1L, "Hi"));
+ testData.add(Row.of(2, 2L, "Hello"));
+ testData.add(Row.of(3, 2L, "Hello world"));
+ testData.add(Row.of(3, 3L, "Hello world!"));
}
// ------------------------------- Utilities -------------------------------------------------
@@ -721,4 +592,42 @@ public class HBaseConnectorITCase extends HBaseTestBase {
}
}
+ private String createHBaseTableDDL(String tableName, boolean testTimeAndDecimalTypes) {
+ StringBuilder family4Statement = new StringBuilder();
+ if (testTimeAndDecimalTypes) {
+ family4Statement.append(", family4 ROW<col1 TIMESTAMP(3)");
+ family4Statement.append(", col2 DATE");
+ family4Statement.append(", col3 TIME(3)");
+ family4Statement.append(", col4 DECIMAL(12, 4)");
+ family4Statement.append("> \n");
+ }
+ if (isLegacyConnector) {
+ return "CREATE TABLE " + tableName + "(\n" +
+ " rowkey INT,\n" +
+ " family1 ROW<col1 INT>,\n" +
+ " family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
+ " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>" +
+ family4Statement.toString() +
+ ") WITH (\n" +
+ " 'connector.type' = 'hbase',\n" +
+ " 'connector.version' = '1.4.3',\n" +
+ " 'connector.table-name' = '" + tableName + "',\n" +
+ " 'connector.zookeeper.quorum' = '" + getZookeeperQuorum() + "',\n" +
+ " 'connector.zookeeper.znode.parent' = '/hbase' " +
+ ")";
+ } else {
+ return "CREATE TABLE " + tableName + "(\n" +
+ " rowkey INT," +
+ " family1 ROW<col1 INT>,\n" +
+ " family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
+ " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>" +
+ family4Statement.toString() +
+ ") WITH (\n" +
+ " 'connector' = 'hbase-1.4',\n" +
+ " 'table-name' = '" + tableName + "',\n" +
+ " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "',\n" +
+ " 'zookeeper.znode.parent' = '/hbase' " +
+ ")";
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
index 061114e..47775ac 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.connector.hbase.options.HBaseOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase.source.HBaseDynamicTableSource;
-import org.apache.flink.connector.hbase.source.HBaseLookupFunction;
+import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -111,8 +111,8 @@ public class HBaseDynamicTableFactoryTest {
assertTrue(lookupProvider instanceof TableFunctionProvider);
TableFunction tableFunction = ((TableFunctionProvider) lookupProvider).createTableFunction();
- assertTrue(tableFunction instanceof HBaseLookupFunction);
- assertEquals("testHBastTable", ((HBaseLookupFunction) tableFunction).getHTableName());
+ assertTrue(tableFunction instanceof HBaseRowDataLookupFunction);
+ assertEquals("testHBastTable", ((HBaseRowDataLookupFunction) tableFunction).getHTableName());
HBaseTableSchema hbaseSchema = hbaseSource.getHBaseTableSchema();
assertEquals(2, hbaseSchema.getRowKeyIndex());
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
index e00b986..c376b95 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
@@ -28,9 +28,17 @@ import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.dateToInternal;
+import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timeToInternal;
+import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToInternal;
+
/**
* Abstract IT case class for HBase.
*/
@@ -40,7 +48,8 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
- protected static final String ROWKEY = "rk";
+ protected static final String ROW_KEY = "rowkey";
+
protected static final String FAMILY1 = "family1";
protected static final String F1COL1 = "col1";
@@ -54,11 +63,16 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
protected static final String F3COL3 = "col3";
protected static final String FAMILY4 = "family4";
+ protected static final String F4COL1 = "col1";
+ protected static final String F4COL2 = "col2";
+ protected static final String F4COL3 = "col3";
+ protected static final String F4COL4 = "col4";
private static final byte[][] FAMILIES = new byte[][]{
Bytes.toBytes(FAMILY1),
Bytes.toBytes(FAMILY2),
- Bytes.toBytes(FAMILY3)
+ Bytes.toBytes(FAMILY3),
+ Bytes.toBytes(FAMILY4)
};
private static final byte[][] SPLIT_KEYS = new byte[][]{Bytes.toBytes(4)};
@@ -107,14 +121,30 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
HTable table = openTable(tableName);
List<Put> puts = new ArrayList<>();
// add some data
- puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
- puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
- puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
- puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
- puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
- puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
- puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
- puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
+ puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
+ Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"),
+ Time.valueOf("19:00:00"), new BigDecimal(12345678.0001)));
+ puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
+ Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00"),
+ new BigDecimal(12345678.0002)));
+ puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
+ Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00"),
+ new BigDecimal(12345678.0003)));
+ puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4",
+ Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00"),
+ new BigDecimal(12345678.0004)));
+ puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
+ Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00"),
+ new BigDecimal(12345678.0005)));
+ puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
+ Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00"),
+ new BigDecimal(12345678.0006)));
+ puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
+ Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00"),
+ new BigDecimal(12345678.0007)));
+ puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8",
+ Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00"),
+ new BigDecimal(12345678.0008)));
// append rows to table
table.put(puts);
@@ -139,7 +169,18 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
createTable(tableName, families, SPLIT_KEYS);
}
- private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
+ private static Put putRow(
+ int rowKey,
+ int f1c1,
+ String f2c1,
+ long f2c2,
+ double f3c1,
+ boolean f3c2,
+ String f3c3,
+ Timestamp f4c1,
+ Date f4c2,
+ Time f4c3,
+ BigDecimal f4c4) {
Put put = new Put(Bytes.toBytes(rowKey));
// family 1
put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
@@ -153,6 +194,11 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
+ // family 4
+ put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL1), Bytes.toBytes(timestampToInternal(f4c1)));
+ put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL2), Bytes.toBytes(dateToInternal(f4c2)));
+ put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL3), Bytes.toBytes(timeToInternal(f4c3)));
+ put.addColumn(Bytes.toBytes(FAMILY4), Bytes.toBytes(F4COL4), Bytes.toBytes(f4c4));
return put;
}
}