You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/21 02:16:23 UTC

[flink] branch release-1.11 updated: Revert "[FLINK-17356][jdbc][postgres] Add IT cases for inserting group by query into posgres catalog table"

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 4d84721  Revert "[FLINK-17356][jdbc][postgres] Add IT cases for inserting group by query into posgres catalog table"
4d84721 is described below

commit 4d84721ff520ada5dfed432639d2ae34e3479ee5
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 20 22:04:48 2020 +0200

    Revert "[FLINK-17356][jdbc][postgres] Add IT cases for inserting group by query into posgres catalog table"
    
    This reverts commit 38ada4ad5ece2d28707e9403278133d8e5790ec0.
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 32 ++++++++++++---------
 .../connector/jdbc/catalog/PostgresCatalog.java    | 12 +++-----
 .../jdbc/catalog/PostgresCatalogITCase.java        | 33 ++++------------------
 .../jdbc/catalog/PostgresCatalogTest.java          |  6 +---
 .../jdbc/catalog/PostgresCatalogTestBase.java      | 18 ++++--------
 5 files changed, 34 insertions(+), 67 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 33603d2..199d481 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -55,12 +55,14 @@ import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Arrays;
+import java.util.AbstractMap;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -124,29 +126,31 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
 
 	// ------ retrieve PK constraint ------
 
-	protected Optional<UniqueConstraint> getPrimaryKey(DatabaseMetaData metaData, String schema, String table) throws SQLException {
+	protected UniqueConstraint getPrimaryKey(DatabaseMetaData metaData, String schema, String table) throws SQLException {
 
 		// According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
 		// the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
 		// We need to sort them based on the KEY_SEQ value.
 		ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
 
-		Map<Integer, String> keySeqColumnName = new HashMap<>();
+		List<Map.Entry<Integer, String>> columnsWithIndex = null;
 		String pkName = null;
-		while (rs.next())  {
+		while (rs.next()) {
 			String columnName = rs.getString("COLUMN_NAME");
-			pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+			pkName = rs.getString("PK_NAME");
 			int keySeq = rs.getInt("KEY_SEQ");
-			keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+			if (columnsWithIndex == null) {
+				columnsWithIndex = new ArrayList<>();
+			}
+			columnsWithIndex.add(new AbstractMap.SimpleEntry<>(Integer.valueOf(keySeq), columnName));
 		}
-		List<String> pkFields = Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
-		keySeqColumnName.forEach(pkFields::set);
-		if (!pkFields.isEmpty()) {
-			// PK_NAME maybe null according to the javadoc, generate an unique name in that case
-			pkName = pkName != null ? pkName : "pk_" + String.join("_", pkFields);
-			return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+		if (columnsWithIndex != null) {
+			// sort columns by KEY_SEQ
+			columnsWithIndex.sort(Comparator.comparingInt(Map.Entry::getKey));
+			List<String> cols = columnsWithIndex.stream().map(Map.Entry::getValue).collect(Collectors.toList());
+			return UniqueConstraint.primaryKey(pkName, cols);
 		}
-		return Optional.empty();
+		return null;
 	}
 
 	// ------ table factory ------
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
index a8bb4b0..cb20fb0 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
@@ -49,7 +49,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory.IDENTIFIER;
@@ -184,10 +183,7 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
 		String dbUrl = baseUrl + tablePath.getDatabaseName();
 		try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
 			DatabaseMetaData metaData = conn.getMetaData();
-			Optional<UniqueConstraint> primaryKey = getPrimaryKey(
-				metaData,
-				pgPath.getPgSchemaName(),
-				pgPath.getPgTableName());
+			UniqueConstraint pk = getPrimaryKey(metaData, pgPath.getPgSchemaName(), pgPath.getPgTableName());
 
 			PreparedStatement ps = conn.prepareStatement(
 				String.format("SELECT * FROM %s;", pgPath.getFullPath()));
@@ -207,9 +203,9 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
 
 			TableSchema.Builder tableBuilder = new TableSchema.Builder()
 				.fields(names, types);
-			primaryKey.ifPresent(pk ->
-				tableBuilder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0]))
-			);
+			if (pk != null) {
+				tableBuilder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0]));
+			}
 			TableSchema tableSchema = tableBuilder.build();
 
 			Map<String, String> props = new HashMap<>();
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
index 5defb2e..a5ad1ec 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.catalog;
 
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.types.Row;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -69,40 +69,19 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
 	}
 
 	@Test
-	public void testInsert() {
+	public void test_insert() throws Exception {
 		TableEnvironment tEnv = getTableEnvWithPgCatalog();
 
-		TableEnvUtil.execInsertSqlAndWaitResult(
-			tEnv,
-			String.format("insert into %s select * from `%s`", TABLE4, TABLE1));
+		TableResult tableResult = tEnv.executeSql(String.format("insert into %s select * from `%s`", TABLE4, TABLE1));
+		// wait to finish
+		tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
 
 		List<Row> results = Lists.newArrayList(
-			tEnv.sqlQuery(String.format("select * from %s", TABLE4)).execute().collect());
+			tEnv.sqlQuery(String.format("select * from %s", TABLE1)).execute().collect());
 		assertEquals("[1]", results.toString());
 	}
 
 	@Test
-	public void testGroupByInsert() {
-		TableEnvironment tEnv = getTableEnvWithPgCatalog();
-
-		TableEnvUtil.execInsertSqlAndWaitResult(
-			tEnv,
-			String.format(
-				"insert into `%s` " +
-					"select `int`, cast('A' as bytes), `short`, max(`long`), max(`real`), " +
-					"max(`double_precision`), max(`numeric`), max(`boolean`), max(`text`), " +
-					"'B', 'C', max(`character_varying`), " +
-					"max(`timestamp`), max(`date`), max(`time`), max(`default_numeric`) " +
-					"from `%s` group by `int`, `short`",
-				TABLE_PRIMITIVE_TYPE2,
-				TABLE_PRIMITIVE_TYPE));
-
-		List<Row> results = Lists.newArrayList(
-			tEnv.sqlQuery(String.format("select * from `%s`", TABLE_PRIMITIVE_TYPE2)).execute().collect());
-		assertEquals("[1,[65],3,4,5.5,6.6,7.70000,true,a,B,C  ,d,2016-06-22T19:10:25,2015-01-01,00:51:03,500.000000000000000000]", results.toString());
-	}
-
-	@Test
 	public void testPrimitiveTypes() throws Exception {
 		TableEnvironment tEnv = getTableEnvWithPgCatalog();
 
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
index 7142d41..7787cc5 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
@@ -70,11 +70,7 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
 	public void testListTables() throws DatabaseNotExistException {
 		List<String> actual = catalog.listTables(PostgresCatalog.DEFAULT_DATABASE);
 
-		assertEquals(
-			Arrays.asList(
-				"public.array_table", "public.primitive_table", "public.primitive_table2",
-				"public.t1", "public.t4", "public.t5"),
-			actual);
+		assertEquals(Arrays.asList("public.dt", "public.dt2", "public.t1", "public.t4", "public.t5"), actual);
 
 		actual = catalog.listTables(TEST_DB);
 
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index 96606cc..fd916e7 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -55,9 +55,8 @@ public class PostgresCatalogTestBase {
 	protected static final String TABLE3 = "t3";
 	protected static final String TABLE4 = "t4";
 	protected static final String TABLE5 = "t5";
-	protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table";
-	protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2";
-	protected static final String TABLE_ARRAY_TYPE = "array_table";
+	protected static final String TABLE_PRIMITIVE_TYPE = "dt";
+	protected static final String TABLE_ARRAY_TYPE = "dt2";
 
 	protected static String baseUrl;
 	protected static PostgresCatalog catalog;
@@ -90,7 +89,6 @@ public class PostgresCatalogTestBase {
 		createTable(TEST_DB, PostgresTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql);
 		createTable(TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql);
 		createTable(PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE), getPrimitiveTable().pgSchemaSql);
-		createTable(PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2), getPrimitiveTable("test_pk2").pgSchemaSql);
 		createTable(PostgresTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), getArrayTable().pgSchemaSql);
 
 		executeSQL(PostgresCatalog.DEFAULT_DATABASE, String.format("insert into public.%s values (%s);", TABLE1, getSimpleTable().values));
@@ -152,17 +150,11 @@ public class PostgresCatalogTestBase {
 		);
 	}
 
-	// posgres doesn't support to use the same primary key name across different tables,
-	// make the table parameterized to resolve this problem.
-	public static TestTable getPrimitiveTable() {
-		return getPrimitiveTable("test_pk");
-	}
-
 	// TODO: add back timestamptz and time types.
 	//  Flink currently doens't support converting time's precision, with the following error
 	//  TableException: Unsupported conversion from data type 'TIME(6)' (conversion class: java.sql.Time)
 	//  to type information. Only data types that originated from type information fully support a reverse conversion.
-	public static TestTable getPrimitiveTable(String primaryKeyName) {
+	public static TestTable getPrimitiveTable() {
 		return new TestTable(
 			TableSchema.builder()
 				.field("int", DataTypes.INT().notNull())
@@ -183,7 +175,7 @@ public class PostgresCatalogTestBase {
 				.field("date", DataTypes.DATE())
 				.field("time", DataTypes.TIME(0))
 				.field("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))
-				.primaryKey(primaryKeyName, new String[]{"short", "int"})
+				.primaryKey("test_pk", new String[]{"int", "short"})
 				.build(),
 			"int integer, " +
 				"bytea bytea, " +
@@ -203,7 +195,7 @@ public class PostgresCatalogTestBase {
 				"date date," +
 				"time time(0), " +
 				"default_numeric numeric, " +
-				"CONSTRAINT " + primaryKeyName + " PRIMARY KEY (short, int)",
+				"CONSTRAINT test_pk PRIMARY KEY (int, short)",
 			"1," +
 				"'2'," +
 				"3," +