You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/19 16:22:56 UTC
[flink] 02/02: [FLINK-13699][hbase] Add integration test for HBase
to verify DDL with TIMESTAMP types
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 95ba5408833fa38aba5624be1fa88fee342cdd1b
Author: Jark Wu <im...@gmail.com>
AuthorDate: Mon Aug 19 21:01:59 2019 +0800
[FLINK-13699][hbase] Add integration test for HBase to verify DDL with TIMESTAMP types
---
.../flink/addons/hbase/util/HBaseTypeUtils.java | 3 +-
.../flink/addons/hbase/HBaseConnectorITCase.java | 128 +++++++++++++++++++--
.../flink/addons/hbase/util/HBaseTestBase.java | 16 +++
3 files changed, 133 insertions(+), 14 deletions(-)
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
index 83c9eb2..73f393f 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.addons.hbase.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.commons.net.ntp.TimeStamp;
import org.apache.hadoop.hbase.util.Bytes;
import java.math.BigDecimal;
@@ -102,7 +101,7 @@ public class HBaseTypeUtils {
case 8:
return Bytes.toBytes((boolean) value);
case 9: // sql.Timestamp encoded to Long
- return Bytes.toBytes(((TimeStamp) value).getTime());
+ return Bytes.toBytes(((Timestamp) value).getTime());
case 10: // sql.Date encoded as long
return Bytes.toBytes(((Date) value).getTime());
case 11: // sql.Time encoded as long
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 6a0909e..d7231ab 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
@@ -38,7 +39,9 @@ import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.planner.runtime.utils.TableUtil;
+import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil;
+import org.apache.flink.table.planner.sinks.CollectRowTableSink;
+import org.apache.flink.table.planner.sinks.CollectTableSink;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.utils.StreamITCase;
import org.apache.flink.table.sinks.TableSink;
@@ -53,11 +56,16 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import scala.Option;
+
import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
@@ -228,18 +236,27 @@ public class HBaseConnectorITCase extends HBaseTestBase {
// prepare a source collection.
private static final List<Row> testData1 = new ArrayList<>();
private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
- new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
- new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"});
+ new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE,
+ Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.SQL_TIME},
+ new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3", "f4c1", "f4c2", "f4c3"});
static {
- testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
- testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
- testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
- testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
- testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
- testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
- testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
- testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
+ testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
+ Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:00:00")));
+ testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
+ Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00")));
+ testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
+ Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00")));
+ testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4",
+ Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00")));
+ testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
+ Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00")));
+ testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
+ Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00")));
+ testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
+ Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00")));
+ testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8",
+ Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00")));
}
@Test
@@ -320,6 +337,72 @@ public class HBaseConnectorITCase extends HBaseTestBase {
TestBaseUtils.compareResultAsText(results, expected);
}
+ @Test
+ public void testTableSourceSinkWithDDL() throws Exception {
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+ DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
+ tEnv.registerDataStream("src", ds);
+
+ // register hbase table
+ String quorum = getZookeeperQuorum();
+ String ddl = "CREATE TABLE hbase (\n" +
+ " rowkey INT," +
+ " family1 ROW<col1 INT>,\n" +
+ " family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
+ " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>,\n" +
+ " family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3)>\n" +
+ ") WITH (\n" +
+ " 'connector.type' = 'hbase',\n" +
+ " 'connector.version' = '1.4.3',\n" +
+ " 'connector.table-name' = 'testTable3',\n" +
+ " 'connector.zookeeper.quorum' = '" + quorum + "',\n" +
+ " 'connector.zookeeper.znode.parent' = '/hbase' " +
+ ")";
+ tEnv.sqlUpdate(ddl);
+
+ String query = "INSERT INTO hbase " +
+ "SELECT rowkey, ROW(f1c1), ROW(f2c1, f2c2), ROW(f3c1, f3c2, f3c3), ROW(f4c1, f4c2, f4c3) " +
+ "FROM src";
+ tEnv.sqlUpdate(query);
+
+ // wait to finish
+ tEnv.execute("HBase Job");
+
+ // start a batch scan job to verify contents in HBase table
+ TableEnvironment batchTableEnv = createBatchTableEnv();
+ batchTableEnv.sqlUpdate(ddl);
+
+ Table table = batchTableEnv.sqlQuery(
+ "SELECT " +
+ " h.rowkey, " +
+ " h.family1.col1, " +
+ " h.family2.col1, " +
+ " h.family2.col2, " +
+ " h.family3.col1, " +
+ " h.family3.col2, " +
+ " h.family3.col3, " +
+ " h.family4.col1, " +
+ " h.family4.col2, " +
+ " h.family4.col3 " +
+ "FROM hbase AS h"
+ );
+
+ List<Row> results = collectBatchResult(table);
+ String expected =
+ "1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18 19:00:00.0,2019-08-18,19:00:00\n" +
+ "2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18 19:01:00.0,2019-08-18,19:01:00\n" +
+ "3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18 19:02:00.0,2019-08-18,19:02:00\n" +
+ "4,40,,400,4.04,true,Welt-4,2019-08-18 19:03:00.0,2019-08-18,19:03:00\n" +
+ "5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19 19:10:00.0,2019-08-19,19:10:00\n" +
+ "6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19 19:20:00.0,2019-08-19,19:20:00\n" +
+ "7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19 19:30:00.0,2019-08-19,19:30:00\n" +
+ "8,80,,800,8.08,true,Welt-8,2019-08-19 19:40:00.0,2019-08-19,19:40:00\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
// -------------------------------------------------------------------------------------
// HBase lookup source tests
@@ -461,7 +544,28 @@ public class HBaseConnectorITCase extends HBaseTestBase {
DataSet<Row> resultSet = batchTableEnv.toDataSet(table, Row.class);
return resultSet.collect();
} else {
- return JavaScalaConversionUtil.toJava(TableUtil.collect(tableImpl));
+ TableImpl t = (TableImpl) table;
+ TableSchema schema = t.getSchema();
+ List<TypeInformation> types = new ArrayList<>();
+ for (TypeInformation typeInfo : t.getSchema().getFieldTypes()) {
+ // convert LOCAL_DATE_TIME to legacy TIMESTAMP to make the output consistent with flink batch planner
+ if (typeInfo.equals(Types.LOCAL_DATE_TIME)) {
+ types.add(Types.SQL_TIMESTAMP);
+ } else if (typeInfo.equals(Types.LOCAL_DATE)) {
+ types.add(Types.SQL_DATE);
+ } else if (typeInfo.equals(Types.LOCAL_TIME)) {
+ types.add(Types.SQL_TIME);
+ } else {
+ types.add(typeInfo);
+ }
+ }
+ CollectRowTableSink sink = new CollectRowTableSink();
+ CollectTableSink<Row> configuredSink = (CollectTableSink<Row>) sink.configure(
+ schema.getFieldNames(), types.toArray(new TypeInformation[0]));
+ return JavaScalaConversionUtil.toJava(
+ BatchTableEnvUtil.collect(
+ t.getTableEnvironment(), table, configuredSink, Option.apply("JOB"),
+ EnvironmentSettings.DEFAULT_BUILTIN_CATALOG, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE));
}
}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
index 32baf0c..985fe19 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
@@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_1 = "testTable1";
protected static final String TEST_TABLE_2 = "testTable2";
+ protected static final String TEST_TABLE_3 = "testTable3";
protected static final String ROWKEY = "rk";
protected static final String FAMILY1 = "family1";
@@ -58,6 +59,8 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
protected static final String F3COL2 = "col2";
protected static final String F3COL3 = "col3";
+ protected static final String FAMILY4 = "family4";
+
private static final byte[][] FAMILIES = new byte[][]{
Bytes.toBytes(FAMILY1),
Bytes.toBytes(FAMILY2),
@@ -100,6 +103,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
private static void prepareTables() throws IOException {
createHBaseTable1();
createHBaseTable2();
+ createHBaseTable3();
}
private static void createHBaseTable1() throws IOException {
@@ -131,6 +135,18 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
createTable(tableName, FAMILIES, SPLIT_KEYS);
}
+ private static void createHBaseTable3() {
+ // create a table
+ byte[][] families = new byte[][]{
+ Bytes.toBytes(FAMILY1),
+ Bytes.toBytes(FAMILY2),
+ Bytes.toBytes(FAMILY3),
+ Bytes.toBytes(FAMILY4),
+ };
+ TableName tableName = TableName.valueOf(TEST_TABLE_3);
+ createTable(tableName, families, SPLIT_KEYS);
+ }
+
private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
Put put = new Put(Bytes.toBytes(rowKey));
// family 1