You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/19 16:22:54 UTC

[flink] branch release-1.9 updated (b3ce2c7 -> 95ba540)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b3ce2c7  [FLINK-13231] [pubsub] Replace Max outstanding acknowledgement ids limit with a FlinkConnectorRateLimiter
     new dca0879  [FLINK-13699][table-api] Fix TableFactory doesn't work with DDL when containing TIMESTAMP/DATE/TIME types
     new 95ba540  [FLINK-13699][hbase] Add integration test for HBase to verify DDL with TIMESTAMP types

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/addons/hbase/util/HBaseTypeUtils.java    |   3 +-
 .../flink/addons/hbase/HBaseConnectorITCase.java   | 128 +++++++++++++++++++--
 .../flink/addons/hbase/util/HBaseTestBase.java     |  16 +++
 .../apache/flink/table/utils/TypeStringUtils.java  |   9 +-
 .../flink/table/utils/TypeStringUtilsTest.java     |  13 +++
 5 files changed, 152 insertions(+), 17 deletions(-)


[flink] 02/02: [FLINK-13699][hbase] Add integration test for HBase to verify DDL with TIMESTAMP types

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95ba5408833fa38aba5624be1fa88fee342cdd1b
Author: Jark Wu <im...@gmail.com>
AuthorDate: Mon Aug 19 21:01:59 2019 +0800

    [FLINK-13699][hbase] Add integration test for HBase to verify DDL with TIMESTAMP types
---
 .../flink/addons/hbase/util/HBaseTypeUtils.java    |   3 +-
 .../flink/addons/hbase/HBaseConnectorITCase.java   | 128 +++++++++++++++++++--
 .../flink/addons/hbase/util/HBaseTestBase.java     |  16 +++
 3 files changed, 133 insertions(+), 14 deletions(-)

diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
index 83c9eb2..73f393f 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseTypeUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.addons.hbase.util;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
-import org.apache.commons.net.ntp.TimeStamp;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.math.BigDecimal;
@@ -102,7 +101,7 @@ public class HBaseTypeUtils {
 			case 8:
 				return Bytes.toBytes((boolean) value);
 			case 9: // sql.Timestamp encoded to Long
-				return Bytes.toBytes(((TimeStamp) value).getTime());
+				return Bytes.toBytes(((Timestamp) value).getTime());
 			case 10: // sql.Date encoded as long
 				return Bytes.toBytes(((Date) value).getTime());
 			case 11: // sql.Time encoded as long
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 6a0909e..d7231ab 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
@@ -38,7 +39,9 @@ import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.planner.runtime.utils.TableUtil;
+import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil;
+import org.apache.flink.table.planner.sinks.CollectRowTableSink;
+import org.apache.flink.table.planner.sinks.CollectTableSink;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.utils.StreamITCase;
 import org.apache.flink.table.sinks.TableSink;
@@ -53,11 +56,16 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import scala.Option;
+
 import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
@@ -228,18 +236,27 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 	// prepare a source collection.
 	private static final List<Row> testData1 = new ArrayList<>();
 	private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
-		new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
-		new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"});
+		new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE,
+			Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.SQL_TIME},
+		new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3", "f4c1", "f4c2", "f4c3"});
 
 	static {
-		testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
-		testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
-		testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
-		testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
-		testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
-		testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
-		testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
-		testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
+		testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
+			Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:00:00")));
+		testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
+			Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00")));
+		testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
+			Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00")));
+		testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4",
+			Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00")));
+		testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
+			Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00")));
+		testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
+			Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00")));
+		testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
+			Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00")));
+		testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8",
+			Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00")));
 	}
 
 	@Test
@@ -320,6 +337,72 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 		TestBaseUtils.compareResultAsText(results, expected);
 	}
 
+	@Test
+	public void testTableSourceSinkWithDDL() throws Exception {
+		StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+		DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
+		tEnv.registerDataStream("src", ds);
+
+		// register hbase table
+		String quorum = getZookeeperQuorum();
+		String ddl = "CREATE TABLE hbase (\n" +
+			"    rowkey INT," +
+			"    family1 ROW<col1 INT>,\n" +
+			"    family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
+			"    family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>,\n" +
+			"    family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3)>\n" +
+			") WITH (\n" +
+			"    'connector.type' = 'hbase',\n" +
+			"    'connector.version' = '1.4.3',\n" +
+			"    'connector.table-name' = 'testTable3',\n" +
+			"    'connector.zookeeper.quorum' = '" + quorum + "',\n" +
+			"    'connector.zookeeper.znode.parent' = '/hbase' " +
+			")";
+		tEnv.sqlUpdate(ddl);
+
+		String query = "INSERT INTO hbase " +
+			"SELECT rowkey, ROW(f1c1), ROW(f2c1, f2c2), ROW(f3c1, f3c2, f3c3), ROW(f4c1, f4c2, f4c3) " +
+			"FROM src";
+		tEnv.sqlUpdate(query);
+
+		// wait to finish
+		tEnv.execute("HBase Job");
+
+		// start a batch scan job to verify contents in HBase table
+		TableEnvironment batchTableEnv = createBatchTableEnv();
+		batchTableEnv.sqlUpdate(ddl);
+
+		Table table = batchTableEnv.sqlQuery(
+			"SELECT " +
+				"  h.rowkey, " +
+				"  h.family1.col1, " +
+				"  h.family2.col1, " +
+				"  h.family2.col2, " +
+				"  h.family3.col1, " +
+				"  h.family3.col2, " +
+				"  h.family3.col3, " +
+				"  h.family4.col1, " +
+				"  h.family4.col2, " +
+				"  h.family4.col3 " +
+				"FROM hbase AS h"
+		);
+
+		List<Row> results = collectBatchResult(table);
+		String expected =
+				"1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18 19:00:00.0,2019-08-18,19:00:00\n" +
+				"2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18 19:01:00.0,2019-08-18,19:01:00\n" +
+				"3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18 19:02:00.0,2019-08-18,19:02:00\n" +
+				"4,40,,400,4.04,true,Welt-4,2019-08-18 19:03:00.0,2019-08-18,19:03:00\n" +
+				"5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19 19:10:00.0,2019-08-19,19:10:00\n" +
+				"6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19 19:20:00.0,2019-08-19,19:20:00\n" +
+				"7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19 19:30:00.0,2019-08-19,19:30:00\n" +
+				"8,80,,800,8.08,true,Welt-8,2019-08-19 19:40:00.0,2019-08-19,19:40:00\n";
+
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
 
 	// -------------------------------------------------------------------------------------
 	// HBase lookup source tests
@@ -461,7 +544,28 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 			DataSet<Row> resultSet = batchTableEnv.toDataSet(table, Row.class);
 			return resultSet.collect();
 		} else {
-			return JavaScalaConversionUtil.toJava(TableUtil.collect(tableImpl));
+			TableImpl t = (TableImpl) table;
+			TableSchema schema = t.getSchema();
+			List<TypeInformation> types = new ArrayList<>();
+			for (TypeInformation typeInfo : t.getSchema().getFieldTypes()) {
+				// convert LOCAL_DATE_TIME to legacy TIMESTAMP to make the output consistent with flink batch planner
+				if (typeInfo.equals(Types.LOCAL_DATE_TIME)) {
+					types.add(Types.SQL_TIMESTAMP);
+				} else if (typeInfo.equals(Types.LOCAL_DATE)) {
+					types.add(Types.SQL_DATE);
+				} else if (typeInfo.equals(Types.LOCAL_TIME)) {
+					types.add(Types.SQL_TIME);
+				} else {
+					types.add(typeInfo);
+				}
+			}
+			CollectRowTableSink sink = new CollectRowTableSink();
+			CollectTableSink<Row> configuredSink = (CollectTableSink<Row>) sink.configure(
+				schema.getFieldNames(), types.toArray(new TypeInformation[0]));
+			return JavaScalaConversionUtil.toJava(
+				BatchTableEnvUtil.collect(
+					t.getTableEnvironment(), table, configuredSink, Option.apply("JOB"),
+					EnvironmentSettings.DEFAULT_BUILTIN_CATALOG, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE));
 		}
 	}
 
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
index 32baf0c..985fe19 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
@@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 
 	protected static final String TEST_TABLE_1 = "testTable1";
 	protected static final String TEST_TABLE_2 = "testTable2";
+	protected static final String TEST_TABLE_3 = "testTable3";
 
 	protected static final String ROWKEY = "rk";
 	protected static final String FAMILY1 = "family1";
@@ -58,6 +59,8 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 	protected static final String F3COL2 = "col2";
 	protected static final String F3COL3 = "col3";
 
+	protected static final String FAMILY4 = "family4";
+
 	private static final byte[][] FAMILIES = new byte[][]{
 		Bytes.toBytes(FAMILY1),
 		Bytes.toBytes(FAMILY2),
@@ -100,6 +103,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 	private static void prepareTables() throws IOException {
 		createHBaseTable1();
 		createHBaseTable2();
+		createHBaseTable3();
 	}
 
 	private static void createHBaseTable1() throws IOException {
@@ -131,6 +135,18 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 		createTable(tableName, FAMILIES, SPLIT_KEYS);
 	}
 
+	private static void createHBaseTable3() {
+		// create a table
+		byte[][] families = new byte[][]{
+			Bytes.toBytes(FAMILY1),
+			Bytes.toBytes(FAMILY2),
+			Bytes.toBytes(FAMILY3),
+			Bytes.toBytes(FAMILY4),
+		};
+		TableName tableName = TableName.valueOf(TEST_TABLE_3);
+		createTable(tableName, families, SPLIT_KEYS);
+	}
+
 	private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
 		Put put = new Put(Bytes.toBytes(rowKey));
 		// family 1


[flink] 01/02: [FLINK-13699][table-api] Fix TableFactory doesn't work with DDL when containing TIMESTAMP/DATE/TIME types

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dca0879d8e992b92cff22b27fb15f598dd2c36d9
Author: Jark Wu <im...@gmail.com>
AuthorDate: Tue Aug 13 11:46:06 2019 +0800

    [FLINK-13699][table-api] Fix TableFactory doesn't work with DDL when containing TIMESTAMP/DATE/TIME types
    
    The solution is encode DataTypes.TIMESTAMP() as "TIMESTAMP" when translating to properties.
    And will be converted back to the old TypeInformation: Types.SQL_TIMESTAMP.
    This would fix all factories at once.
---
 .../java/org/apache/flink/table/utils/TypeStringUtils.java  |  9 ++++++---
 .../org/apache/flink/table/utils/TypeStringUtilsTest.java   | 13 +++++++++++++
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
index 6bf414a..d454ffb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
@@ -96,11 +96,14 @@ public class TypeStringUtils {
 			return DOUBLE;
 		} else if (typeInfo.equals(Types.BIG_DEC)) {
 			return DECIMAL;
-		} else if (typeInfo.equals(Types.SQL_DATE)) {
+		} else if (typeInfo.equals(Types.SQL_DATE) || typeInfo.equals(Types.LOCAL_DATE)) {
+			// write LOCAL_DATE as "DATE" to keep compatible when using new types
 			return DATE;
-		} else if (typeInfo.equals(Types.SQL_TIME)) {
+		} else if (typeInfo.equals(Types.SQL_TIME) || typeInfo.equals(Types.LOCAL_TIME)) {
+			// write LOCAL_TIME as "TIME" to keep compatible when using new types
 			return TIME;
-		} else if (typeInfo.equals(Types.SQL_TIMESTAMP)) {
+		} else if (typeInfo.equals(Types.SQL_TIMESTAMP) || typeInfo.equals(Types.LOCAL_DATE_TIME)) {
+			// write LOCAL_DATE_TIME as "TIMESTAMP" to keep compatible when using new types
 			return TIMESTAMP;
 		} else if (typeInfo instanceof RowTypeInfo) {
 			final RowTypeInfo rt = (RowTypeInfo) typeInfo;
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeStringUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeStringUtilsTest.java
index 746a8a1..a2f3045 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeStringUtilsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeStringUtilsTest.java
@@ -47,6 +47,9 @@ public class TypeStringUtilsTest {
 		testReadAndWrite("DATE", Types.SQL_DATE);
 		testReadAndWrite("TIME", Types.SQL_TIME);
 		testReadAndWrite("TIMESTAMP", Types.SQL_TIMESTAMP);
+		testWrite("DATE", Types.LOCAL_DATE);
+		testWrite("TIME", Types.LOCAL_TIME);
+		testWrite("TIMESTAMP", Types.LOCAL_DATE_TIME);
 
 		// unsupported type information
 		testReadAndWrite(
@@ -129,6 +132,11 @@ public class TypeStringUtilsTest {
 				Types.ROW_NAMED(
 					new String[] {"Field 1", "Field`s 2"},
 					Types.ROW(Types.BIG_DEC), Types.STRING)));
+
+		testWrite("ROW<f0 DECIMAL, f1 TIMESTAMP, f2 TIME, f3 DATE>",
+			Types.ROW_NAMED(
+				new String[] {"f0", "f1", "f2", "f3"},
+				Types.BIG_DEC, Types.LOCAL_DATE_TIME, Types.LOCAL_TIME, Types.LOCAL_DATE));
 	}
 
 	@Test(expected = ValidationException.class)
@@ -154,6 +162,11 @@ public class TypeStringUtilsTest {
 		assertEquals(expected, TypeStringUtils.writeTypeInfo(type));
 	}
 
+	private void testWrite(String expected, TypeInformation<?> type) {
+		// test write to string
+		assertEquals(expected, TypeStringUtils.writeTypeInfo(type));
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	/**