You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/20 20:14:29 UTC
[flink] branch master updated: Revert
"[FLINK-17356][jdbc][postgres] Add IT cases for inserting group by query
into posgres catalog table"
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f52899a Revert "[FLINK-17356][jdbc][postgres] Add IT cases for inserting group by query into posgres catalog table"
f52899a is described below
commit f52899a00affcf56bfa4905c23e975d51031e72b
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 20 22:04:48 2020 +0200
Revert "[FLINK-17356][jdbc][postgres] Add IT cases for inserting group by query into posgres catalog table"
This reverts commit 38ada4ad5ece2d28707e9403278133d8e5790ec0.
---
.../jdbc/catalog/AbstractJdbcCatalog.java | 32 ++++++++++++---------
.../connector/jdbc/catalog/PostgresCatalog.java | 12 +++-----
.../jdbc/catalog/PostgresCatalogITCase.java | 33 ++++------------------
.../jdbc/catalog/PostgresCatalogTest.java | 6 +---
.../jdbc/catalog/PostgresCatalogTestBase.java | 18 ++++--------
5 files changed, 34 insertions(+), 67 deletions(-)
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 33603d2..199d481 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -55,12 +55,14 @@ import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Arrays;
+import java.util.AbstractMap;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -124,29 +126,31 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
// ------ retrieve PK constraint ------
- protected Optional<UniqueConstraint> getPrimaryKey(DatabaseMetaData metaData, String schema, String table) throws SQLException {
+ protected UniqueConstraint getPrimaryKey(DatabaseMetaData metaData, String schema, String table) throws SQLException {
// According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
- Map<Integer, String> keySeqColumnName = new HashMap<>();
+ List<Map.Entry<Integer, String>> columnsWithIndex = null;
String pkName = null;
- while (rs.next()) {
+ while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
- pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+ pkName = rs.getString("PK_NAME");
int keySeq = rs.getInt("KEY_SEQ");
- keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+ if (columnsWithIndex == null) {
+ columnsWithIndex = new ArrayList<>();
+ }
+ columnsWithIndex.add(new AbstractMap.SimpleEntry<>(Integer.valueOf(keySeq), columnName));
}
- List<String> pkFields = Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
- keySeqColumnName.forEach(pkFields::set);
- if (!pkFields.isEmpty()) {
- // PK_NAME maybe null according to the javadoc, generate an unique name in that case
- pkName = pkName != null ? pkName : "pk_" + String.join("_", pkFields);
- return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+ if (columnsWithIndex != null) {
+ // sort columns by KEY_SEQ
+ columnsWithIndex.sort(Comparator.comparingInt(Map.Entry::getKey));
+ List<String> cols = columnsWithIndex.stream().map(Map.Entry::getValue).collect(Collectors.toList());
+ return UniqueConstraint.primaryKey(pkName, cols);
}
- return Optional.empty();
+ return null;
}
// ------ table factory ------
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
index a8bb4b0..cb20fb0 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
@@ -49,7 +49,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory.IDENTIFIER;
@@ -184,10 +183,7 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
String dbUrl = baseUrl + tablePath.getDatabaseName();
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
DatabaseMetaData metaData = conn.getMetaData();
- Optional<UniqueConstraint> primaryKey = getPrimaryKey(
- metaData,
- pgPath.getPgSchemaName(),
- pgPath.getPgTableName());
+ UniqueConstraint pk = getPrimaryKey(metaData, pgPath.getPgSchemaName(), pgPath.getPgTableName());
PreparedStatement ps = conn.prepareStatement(
String.format("SELECT * FROM %s;", pgPath.getFullPath()));
@@ -207,9 +203,9 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
TableSchema.Builder tableBuilder = new TableSchema.Builder()
.fields(names, types);
- primaryKey.ifPresent(pk ->
- tableBuilder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0]))
- );
+ if (pk != null) {
+ tableBuilder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0]));
+ }
TableSchema tableSchema = tableBuilder.build();
Map<String, String> props = new HashMap<>();
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
index 5defb2e..a5ad1ec 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.catalog;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -69,40 +69,19 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
}
@Test
- public void testInsert() {
+ public void test_insert() throws Exception {
TableEnvironment tEnv = getTableEnvWithPgCatalog();
- TableEnvUtil.execInsertSqlAndWaitResult(
- tEnv,
- String.format("insert into %s select * from `%s`", TABLE4, TABLE1));
+ TableResult tableResult = tEnv.executeSql(String.format("insert into %s select * from `%s`", TABLE4, TABLE1));
+ // wait to finish
+ tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
List<Row> results = Lists.newArrayList(
- tEnv.sqlQuery(String.format("select * from %s", TABLE4)).execute().collect());
+ tEnv.sqlQuery(String.format("select * from %s", TABLE1)).execute().collect());
assertEquals("[1]", results.toString());
}
@Test
- public void testGroupByInsert() {
- TableEnvironment tEnv = getTableEnvWithPgCatalog();
-
- TableEnvUtil.execInsertSqlAndWaitResult(
- tEnv,
- String.format(
- "insert into `%s` " +
- "select `int`, cast('A' as bytes), `short`, max(`long`), max(`real`), " +
- "max(`double_precision`), max(`numeric`), max(`boolean`), max(`text`), " +
- "'B', 'C', max(`character_varying`), " +
- "max(`timestamp`), max(`date`), max(`time`), max(`default_numeric`) " +
- "from `%s` group by `int`, `short`",
- TABLE_PRIMITIVE_TYPE2,
- TABLE_PRIMITIVE_TYPE));
-
- List<Row> results = Lists.newArrayList(
- tEnv.sqlQuery(String.format("select * from `%s`", TABLE_PRIMITIVE_TYPE2)).execute().collect());
- assertEquals("[1,[65],3,4,5.5,6.6,7.70000,true,a,B,C ,d,2016-06-22T19:10:25,2015-01-01,00:51:03,500.000000000000000000]", results.toString());
- }
-
- @Test
public void testPrimitiveTypes() throws Exception {
TableEnvironment tEnv = getTableEnvWithPgCatalog();
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
index 7142d41..7787cc5 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
@@ -70,11 +70,7 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
public void testListTables() throws DatabaseNotExistException {
List<String> actual = catalog.listTables(PostgresCatalog.DEFAULT_DATABASE);
- assertEquals(
- Arrays.asList(
- "public.array_table", "public.primitive_table", "public.primitive_table2",
- "public.t1", "public.t4", "public.t5"),
- actual);
+ assertEquals(Arrays.asList("public.dt", "public.dt2", "public.t1", "public.t4", "public.t5"), actual);
actual = catalog.listTables(TEST_DB);
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index 96606cc..fd916e7 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -55,9 +55,8 @@ public class PostgresCatalogTestBase {
protected static final String TABLE3 = "t3";
protected static final String TABLE4 = "t4";
protected static final String TABLE5 = "t5";
- protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table";
- protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2";
- protected static final String TABLE_ARRAY_TYPE = "array_table";
+ protected static final String TABLE_PRIMITIVE_TYPE = "dt";
+ protected static final String TABLE_ARRAY_TYPE = "dt2";
protected static String baseUrl;
protected static PostgresCatalog catalog;
@@ -90,7 +89,6 @@ public class PostgresCatalogTestBase {
createTable(TEST_DB, PostgresTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql);
createTable(TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql);
createTable(PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE), getPrimitiveTable().pgSchemaSql);
- createTable(PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2), getPrimitiveTable("test_pk2").pgSchemaSql);
createTable(PostgresTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), getArrayTable().pgSchemaSql);
executeSQL(PostgresCatalog.DEFAULT_DATABASE, String.format("insert into public.%s values (%s);", TABLE1, getSimpleTable().values));
@@ -152,17 +150,11 @@ public class PostgresCatalogTestBase {
);
}
- // posgres doesn't support to use the same primary key name across different tables,
- // make the table parameterized to resolve this problem.
- public static TestTable getPrimitiveTable() {
- return getPrimitiveTable("test_pk");
- }
-
// TODO: add back timestamptz and time types.
// Flink currently doens't support converting time's precision, with the following error
// TableException: Unsupported conversion from data type 'TIME(6)' (conversion class: java.sql.Time)
// to type information. Only data types that originated from type information fully support a reverse conversion.
- public static TestTable getPrimitiveTable(String primaryKeyName) {
+ public static TestTable getPrimitiveTable() {
return new TestTable(
TableSchema.builder()
.field("int", DataTypes.INT().notNull())
@@ -183,7 +175,7 @@ public class PostgresCatalogTestBase {
.field("date", DataTypes.DATE())
.field("time", DataTypes.TIME(0))
.field("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))
- .primaryKey(primaryKeyName, new String[]{"short", "int"})
+ .primaryKey("test_pk", new String[]{"int", "short"})
.build(),
"int integer, " +
"bytea bytea, " +
@@ -203,7 +195,7 @@ public class PostgresCatalogTestBase {
"date date," +
"time time(0), " +
"default_numeric numeric, " +
- "CONSTRAINT " + primaryKeyName + " PRIMARY KEY (short, int)",
+ "CONSTRAINT test_pk PRIMARY KEY (int, short)",
"1," +
"'2'," +
"3," +