You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/19 16:22:56 UTC

[flink] 02/02: [FLINK-13699][hbase] Add integration test for HBase to verify DDL with TIMESTAMP types

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95ba5408833fa38aba5624be1fa88fee342cdd1b
Author: Jark Wu <im...@gmail.com>
AuthorDate: Mon Aug 19 21:01:59 2019 +0800

    [FLINK-13699][hbase] Add integration test for HBase to verify DDL with TIMESTAMP types
---
 .../flink/addons/hbase/util/HBaseTypeUtils.java    |   3 +-
 .../flink/addons/hbase/HBaseConnectorITCase.java   | 128 +++++++++++++++++++--
 .../flink/addons/hbase/util/HBaseTestBase.java     |  16 +++
 3 files changed, 133 insertions(+), 14 deletions(-)

diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
index 83c9eb2..73f393f 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.addons.hbase.util;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
-import org.apache.commons.net.ntp.TimeStamp;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.math.BigDecimal;
@@ -102,7 +101,7 @@ public class HBaseTypeUtils {
 			case 8:
 				return Bytes.toBytes((boolean) value);
 			case 9: // sql.Timestamp encoded to Long
-				return Bytes.toBytes(((TimeStamp) value).getTime());
+				return Bytes.toBytes(((Timestamp) value).getTime());
 			case 10: // sql.Date encoded as long
 				return Bytes.toBytes(((Date) value).getTime());
 			case 11: // sql.Time encoded as long
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 6a0909e..d7231ab 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
@@ -38,7 +39,9 @@ import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.planner.runtime.utils.TableUtil;
+import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil;
+import org.apache.flink.table.planner.sinks.CollectRowTableSink;
+import org.apache.flink.table.planner.sinks.CollectTableSink;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.utils.StreamITCase;
 import org.apache.flink.table.sinks.TableSink;
@@ -53,11 +56,16 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import scala.Option;
+
 import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
@@ -228,18 +236,27 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 	// prepare a source collection.
 	private static final List<Row> testData1 = new ArrayList<>();
 	private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
-		new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
-		new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"});
+		new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE,
+			Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.SQL_TIME},
+		new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3", "f4c1", "f4c2", "f4c3"});
 
 	static {
-		testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
-		testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
-		testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
-		testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
-		testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
-		testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
-		testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
-		testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
+		testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
+			Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:00:00")));
+		testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
+			Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00")));
+		testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
+			Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00")));
+		testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4",
+			Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00")));
+		testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
+			Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00")));
+		testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
+			Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00")));
+		testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
+			Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00")));
+		testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8",
+			Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00")));
 	}
 
 	@Test
@@ -320,6 +337,72 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
+	@Test
+	public void testTableSourceSinkWithDDL() throws Exception {
+		StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+		DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
+		tEnv.registerDataStream("src", ds);
+
+		// register hbase table
+		String quorum = getZookeeperQuorum();
+		String ddl = "CREATE TABLE hbase (\n" +
+			"    rowkey INT," +
+			"    family1 ROW<col1 INT>,\n" +
+			"    family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
+			"    family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>,\n" +
+			"    family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3)>\n" +
+			") WITH (\n" +
+			"    'connector.type' = 'hbase',\n" +
+			"    'connector.version' = '1.4.3',\n" +
+			"    'connector.table-name' = 'testTable3',\n" +
+			"    'connector.zookeeper.quorum' = '" + quorum + "',\n" +
+			"    'connector.zookeeper.znode.parent' = '/hbase' " +
+			")";
+		tEnv.sqlUpdate(ddl);
+
+		String query = "INSERT INTO hbase " +
+			"SELECT rowkey, ROW(f1c1), ROW(f2c1, f2c2), ROW(f3c1, f3c2, f3c3), ROW(f4c1, f4c2, f4c3) " +
+			"FROM src";
+		tEnv.sqlUpdate(query);
+
+		// wait to finish
+		tEnv.execute("HBase Job");
+
+		// start a batch scan job to verify contents in HBase table
+		TableEnvironment batchTableEnv = createBatchTableEnv();
+		batchTableEnv.sqlUpdate(ddl);
+
+		Table table = batchTableEnv.sqlQuery(
+			"SELECT " +
+				"  h.rowkey, " +
+				"  h.family1.col1, " +
+				"  h.family2.col1, " +
+				"  h.family2.col2, " +
+				"  h.family3.col1, " +
+				"  h.family3.col2, " +
+				"  h.family3.col3, " +
+				"  h.family4.col1, " +
+				"  h.family4.col2, " +
+				"  h.family4.col3 " +
+				"FROM hbase AS h"
+		);
+
+		List<Row> results = collectBatchResult(table);
+		String expected =
+				"1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18 19:00:00.0,2019-08-18,19:00:00\n" +
+				"2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18 19:01:00.0,2019-08-18,19:01:00\n" +
+				"3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18 19:02:00.0,2019-08-18,19:02:00\n" +
+				"4,40,,400,4.04,true,Welt-4,2019-08-18 19:03:00.0,2019-08-18,19:03:00\n" +
+				"5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19 19:10:00.0,2019-08-19,19:10:00\n" +
+				"6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19 19:20:00.0,2019-08-19,19:20:00\n" +
+				"7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19 19:30:00.0,2019-08-19,19:30:00\n" +
+				"8,80,,800,8.08,true,Welt-8,2019-08-19 19:40:00.0,2019-08-19,19:40:00\n";
+
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
 
 	// -------------------------------------------------------------------------------------
 	// HBase lookup source tests
@@ -461,7 +544,28 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 			DataSet<Row> resultSet = batchTableEnv.toDataSet(table, Row.class);
 			return resultSet.collect();
 		} else {
-			return JavaScalaConversionUtil.toJava(TableUtil.collect(tableImpl));
+			TableImpl t = (TableImpl) table;
+			TableSchema schema = t.getSchema();
+			List<TypeInformation> types = new ArrayList<>();
+			for (TypeInformation typeInfo : t.getSchema().getFieldTypes()) {
+				// convert LOCAL_DATE_TIME to legacy TIMESTAMP to make the output consistent with flink batch planner
+				if (typeInfo.equals(Types.LOCAL_DATE_TIME)) {
+					types.add(Types.SQL_TIMESTAMP);
+				} else if (typeInfo.equals(Types.LOCAL_DATE)) {
+					types.add(Types.SQL_DATE);
+				} else if (typeInfo.equals(Types.LOCAL_TIME)) {
+					types.add(Types.SQL_TIME);
+				} else {
+					types.add(typeInfo);
+				}
+			}
+			CollectRowTableSink sink = new CollectRowTableSink();
+			CollectTableSink<Row> configuredSink = (CollectTableSink<Row>) sink.configure(
+				schema.getFieldNames(), types.toArray(new TypeInformation[0]));
+			return JavaScalaConversionUtil.toJava(
+				BatchTableEnvUtil.collect(
+					t.getTableEnvironment(), table, configuredSink, Option.apply("JOB"),
+					EnvironmentSettings.DEFAULT_BUILTIN_CATALOG, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE));
 		}
 	}
 
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
index 32baf0c..985fe19 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
@@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 
 	protected static final String TEST_TABLE_1 = "testTable1";
 	protected static final String TEST_TABLE_2 = "testTable2";
+	protected static final String TEST_TABLE_3 = "testTable3";
 
 	protected static final String ROWKEY = "rk";
 	protected static final String FAMILY1 = "family1";
@@ -58,6 +59,8 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 	protected static final String F3COL2 = "col2";
 	protected static final String F3COL3 = "col3";
 
+	protected static final String FAMILY4 = "family4";
+
 	private static final byte[][] FAMILIES = new byte[][]{
 		Bytes.toBytes(FAMILY1),
 		Bytes.toBytes(FAMILY2),
@@ -100,6 +103,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 	private static void prepareTables() throws IOException {
 		createHBaseTable1();
 		createHBaseTable2();
+		createHBaseTable3();
 	}
 
 	private static void createHBaseTable1() throws IOException {
@@ -131,6 +135,18 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 		createTable(tableName, FAMILIES, SPLIT_KEYS);
 	}
 
+	private static void createHBaseTable3() {
+		// create a table
+		byte[][] families = new byte[][]{
+			Bytes.toBytes(FAMILY1),
+			Bytes.toBytes(FAMILY2),
+			Bytes.toBytes(FAMILY3),
+			Bytes.toBytes(FAMILY4),
+		};
+		TableName tableName = TableName.valueOf(TEST_TABLE_3);
+		createTable(tableName, families, SPLIT_KEYS);
+	}
+
 	private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
 		Put put = new Put(Bytes.toBytes(rowKey));
 		// family 1