You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 11:55:41 UTC
[flink] branch release-1.11 updated: [FLINK-17622][connectors/jdbc]
Remove useless switch for decimal in PostgresCatalog
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 6097d97 [FLINK-17622][connectors/jdbc] Remove useless switch for decimal in PostgresCatalog
6097d97 is described below
commit 6097d97a39877758d2729242186a19d86220e6ea
Author: Flavio Pompermaier <f....@gmail.com>
AuthorDate: Wed May 20 13:53:35 2020 +0200
[FLINK-17622][connectors/jdbc] Remove useless switch for decimal in PostgresCatalog
This closes #12090
---
.../flink/connector/jdbc/catalog/PostgresCatalog.java | 19 +++++++++++++++----
.../connector/jdbc/catalog/PostgresCatalogITCase.java | 3 ++-
.../jdbc/catalog/PostgresCatalogTestBase.java | 6 ++++++
3 files changed, 23 insertions(+), 5 deletions(-)
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
index c9b1124..31b4185 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
@@ -214,6 +214,14 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
}
}
+ // Postgres jdbc driver maps several alias to real type, we use real type rather than alias:
+ // smallint <=> int2
+ // integer <=> int4
+ // int <=> int4
+ // bigint <=> int8
+ // float <=> float8
+ // boolean <=> bool
+ // decimal <=> numeric
public static final String PG_BYTEA = "bytea";
public static final String PG_BYTEA_ARRAY = "_bytea";
public static final String PG_SMALLINT = "int2";
@@ -224,8 +232,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
public static final String PG_BIGINT_ARRAY = "_int8";
public static final String PG_REAL = "float4";
public static final String PG_REAL_ARRAY = "_float4";
- public static final String PG_DECIMAL = "decimal";
- public static final String PG_DECIMAL_ARRAY = "_decimal";
public static final String PG_DOUBLE_PRECISION = "float8";
public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
public static final String PG_NUMERIC = "numeric";
@@ -249,12 +255,19 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
public static final String PG_CHARACTER_VARYING = "varchar";
public static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+ /**
+ * Converts Postgres type to Flink {@link DataType}.
+ *
+ * @see org.postgresql.jdbc.TypeInfoCache
+ */
private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
String pgType = metadata.getColumnTypeName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
+ // pg types that gets replaced by jdbc driver:
+ // - decimal => numeric
switch (pgType) {
case PG_BOOLEAN:
return DataTypes.BOOLEAN();
@@ -284,14 +297,12 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
return DataTypes.DOUBLE();
case PG_DOUBLE_PRECISION_ARRAY:
return DataTypes.ARRAY(DataTypes.DOUBLE());
- case PG_DECIMAL:
case PG_NUMERIC:
// see SPARK-26538: handle numeric without explicit precision and scale.
if (precision > 0) {
return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
}
return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
- case PG_DECIMAL_ARRAY:
case PG_NUMERIC_ARRAY:
// see SPARK-26538: handle numeric without explicit precision and scale.
if (precision > 0) {
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
index 422a81c..a5ad1ec 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
@@ -88,7 +88,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
List<Row> results = Lists.newArrayList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE)).execute().collect());
- assertEquals("[1,[50],3,4,5.5,6.6,7.70000,true,a,b,c ,d,2016-06-22T19:10:25,2015-01-01,00:51:03,500.000000000000000000]", results.toString());
+ assertEquals("[1,[50],3,4,5.5,6.6,7.70000,8.8,true,a,b,c ,d,2016-06-22T19:10:25,2015-01-01,00:51:03,500.000000000000000000]", results.toString());
}
@Test
@@ -107,6 +107,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
"[6.6, 7.7, 8.8]," +
"[7.70000, 8.80000, 9.90000]," +
"[8.800000000000000000, 9.900000000000000000, 10.100000000000000000]," +
+ "[9.90, 10.10, 11.11]," +
"[true, false, true]," +
"[a, b, c]," +
"[b, c, d]," +
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index 62c0e1d..b4b1b44 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -164,6 +164,7 @@ public class PostgresCatalogTestBase {
.field("real", DataTypes.FLOAT())
.field("double_precision", DataTypes.DOUBLE())
.field("numeric", DataTypes.DECIMAL(10, 5))
+ .field("decimal", DataTypes.DECIMAL(10, 1))
.field("boolean", DataTypes.BOOLEAN())
.field("text", DataTypes.STRING())
.field("char", DataTypes.CHAR(1))
@@ -182,6 +183,7 @@ public class PostgresCatalogTestBase {
"real real, " +
"double_precision double precision, " +
"numeric numeric(10, 5), " +
+ "decimal decimal(10, 1), " +
"boolean boolean, " +
"text text, " +
"char char, " +
@@ -199,6 +201,7 @@ public class PostgresCatalogTestBase {
"5.5," +
"6.6," +
"7.7," +
+ "8.8," +
"true," +
"'a'," +
"'b'," +
@@ -224,6 +227,7 @@ public class PostgresCatalogTestBase {
.field("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
.field("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
.field("numeric_arr_default", DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)))
+ .field("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)))
.field("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
.field("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
.field("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
@@ -243,6 +247,7 @@ public class PostgresCatalogTestBase {
"double_precision_arr double precision[], " +
"numeric_arr numeric(10, 5)[], " +
"numeric_arr_default numeric[], " +
+ "decimal_arr decimal(10,2)[], " +
"boolean_arr boolean[], " +
"text_arr text[], " +
"char_arr char[], " +
@@ -261,6 +266,7 @@ public class PostgresCatalogTestBase {
"'{6.6,7.7,8.8}'," +
"'{7.7,8.8,9.9}'," +
"'{8.8,9.9,10.10}'," +
+ "'{9.9,10.10,11.11}'," +
"'{true,false,true}'," +
"'{a,b,c}'," +
"'{b,c,d}'," +