You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 11:55:41 UTC

[flink] branch release-1.11 updated: [FLINK-17622][connectors/jdbc] Remove useless switch for decimal in PostgresCatalog

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 6097d97  [FLINK-17622][connectors/jdbc] Remove useless switch for decimal in PostgresCatalog
6097d97 is described below

commit 6097d97a39877758d2729242186a19d86220e6ea
Author: Flavio Pompermaier <f....@gmail.com>
AuthorDate: Wed May 20 13:53:35 2020 +0200

    [FLINK-17622][connectors/jdbc] Remove useless switch for decimal in PostgresCatalog
    
    This closes #12090
---
 .../flink/connector/jdbc/catalog/PostgresCatalog.java | 19 +++++++++++++++----
 .../connector/jdbc/catalog/PostgresCatalogITCase.java |  3 ++-
 .../jdbc/catalog/PostgresCatalogTestBase.java         |  6 ++++++
 3 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
index c9b1124..31b4185 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
@@ -214,6 +214,14 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
 		}
 	}
 
+	// Postgres jdbc driver maps several alias to real type, we use real type rather than alias:
+	// smallint <=> int2
+	// integer <=> int4
+	// int <=> int4
+	// bigint <=> int8
+	// float <=> float8
+	// boolean <=> bool
+	// decimal <=> numeric
 	public static final String PG_BYTEA = "bytea";
 	public static final String PG_BYTEA_ARRAY = "_bytea";
 	public static final String PG_SMALLINT = "int2";
@@ -224,8 +232,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
 	public static final String PG_BIGINT_ARRAY = "_int8";
 	public static final String PG_REAL = "float4";
 	public static final String PG_REAL_ARRAY = "_float4";
-	public static final String PG_DECIMAL = "decimal";
-	public static final String PG_DECIMAL_ARRAY = "_decimal";
 	public static final String PG_DOUBLE_PRECISION = "float8";
 	public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
 	public static final String PG_NUMERIC = "numeric";
@@ -249,12 +255,19 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
 	public static final String PG_CHARACTER_VARYING = "varchar";
 	public static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
 
+	/**
+	 * Converts Postgres type to Flink {@link DataType}.
+	 *
+	 * @see org.postgresql.jdbc.TypeInfoCache
+	 */
 	private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
 		String pgType = metadata.getColumnTypeName(colIndex);
 
 		int precision = metadata.getPrecision(colIndex);
 		int scale = metadata.getScale(colIndex);
 
+		// pg types that gets replaced by jdbc driver:
+	    // - decimal => numeric
 		switch (pgType) {
 			case PG_BOOLEAN:
 				return DataTypes.BOOLEAN();
@@ -284,14 +297,12 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
 				return DataTypes.DOUBLE();
 			case PG_DOUBLE_PRECISION_ARRAY:
 				return DataTypes.ARRAY(DataTypes.DOUBLE());
-			case PG_DECIMAL:
 			case PG_NUMERIC:
 				// see SPARK-26538: handle numeric without explicit precision and scale.
 				if (precision > 0) {
 					return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
 				}
 				return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
-			case PG_DECIMAL_ARRAY:
 			case PG_NUMERIC_ARRAY:
 				// see SPARK-26538: handle numeric without explicit precision and scale.
 				if (precision > 0) {
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
index 422a81c..a5ad1ec 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
@@ -88,7 +88,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
 		List<Row> results = Lists.newArrayList(
 			tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE)).execute().collect());
 
-		assertEquals("[1,[50],3,4,5.5,6.6,7.70000,true,a,b,c  ,d,2016-06-22T19:10:25,2015-01-01,00:51:03,500.000000000000000000]", results.toString());
+		assertEquals("[1,[50],3,4,5.5,6.6,7.70000,8.8,true,a,b,c  ,d,2016-06-22T19:10:25,2015-01-01,00:51:03,500.000000000000000000]", results.toString());
 	}
 
 	@Test
@@ -107,6 +107,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
 				"[6.6, 7.7, 8.8]," +
 				"[7.70000, 8.80000, 9.90000]," +
 				"[8.800000000000000000, 9.900000000000000000, 10.100000000000000000]," +
+				"[9.90, 10.10, 11.11]," +
 				"[true, false, true]," +
 				"[a, b, c]," +
 				"[b, c, d]," +
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index 62c0e1d..b4b1b44 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -164,6 +164,7 @@ public class PostgresCatalogTestBase {
 				.field("real", DataTypes.FLOAT())
 				.field("double_precision", DataTypes.DOUBLE())
 				.field("numeric", DataTypes.DECIMAL(10, 5))
+				.field("decimal", DataTypes.DECIMAL(10, 1))
 				.field("boolean", DataTypes.BOOLEAN())
 				.field("text", DataTypes.STRING())
 				.field("char", DataTypes.CHAR(1))
@@ -182,6 +183,7 @@ public class PostgresCatalogTestBase {
 				"real real, " +
 				"double_precision double precision, " +
 				"numeric numeric(10, 5), " +
+				"decimal decimal(10, 1), " +
 				"boolean boolean, " +
 				"text text, " +
 				"char char, " +
@@ -199,6 +201,7 @@ public class PostgresCatalogTestBase {
 				"5.5," +
 				"6.6," +
 				"7.7," +
+				"8.8," +
 				"true," +
 				"'a'," +
 				"'b'," +
@@ -224,6 +227,7 @@ public class PostgresCatalogTestBase {
 				.field("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
 				.field("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
 				.field("numeric_arr_default", DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)))
+				.field("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)))
 				.field("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
 				.field("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
 				.field("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
@@ -243,6 +247,7 @@ public class PostgresCatalogTestBase {
 				"double_precision_arr double precision[], " +
 				"numeric_arr numeric(10, 5)[], " +
 				"numeric_arr_default numeric[], " +
+				"decimal_arr decimal(10,2)[], " +
 				"boolean_arr boolean[], " +
 				"text_arr text[], " +
 				"char_arr char[], " +
@@ -261,6 +266,7 @@ public class PostgresCatalogTestBase {
 					"'{6.6,7.7,8.8}'," +
 					"'{7.7,8.8,9.9}'," +
 					"'{8.8,9.9,10.10}'," +
+					"'{9.9,10.10,11.11}'," +
 					"'{true,false,true}'," +
 					"'{a,b,c}'," +
 					"'{b,c,d}'," +