You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/28 07:48:29 UTC

[GitHub] [flink] wuchong commented on a change in pull request #11381: [FLINK-16294][connectors /jdbc]Support to create non-existed table in database automatically when writing data to JDBC connector.

wuchong commented on a change in pull request #11381:
URL: https://github.com/apache/flink/pull/11381#discussion_r416374268



##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -105,6 +119,66 @@ public void open(int taskNumber, int numTasks) throws IOException {
 		}
 	}
 
+	private void tryCreateTableIfNotExists(int taskNumber, JdbcDdlOptions jdbcDdlOptions) throws SQLException, ClassNotFoundException {
+		// only try to create table from one TM, simply choice the minimum TM as leader
+		// after FLIP-27,this can be refactor better.
+		final boolean isCreateTableLeader = taskNumber == 0 ? true : false;

Review comment:
       Simplify to `final boolean isCreateTableLeader = taskNumber == 0;` 

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -105,6 +119,66 @@ public void open(int taskNumber, int numTasks) throws IOException {
 		}
 	}
 
+	private void tryCreateTableIfNotExists(int taskNumber, JdbcDdlOptions jdbcDdlOptions) throws SQLException, ClassNotFoundException {
+		// only try to create table from one TM, simply choice the minimum TM as leader
+		// after FLIP-27,this can be refactor better.
+		final boolean isCreateTableLeader = taskNumber == 0 ? true : false;

Review comment:
       Simplify to `final boolean isCreateTableLeader = taskNumber == 0;` 

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -105,6 +119,66 @@ public void open(int taskNumber, int numTasks) throws IOException {
 		}
 	}
 
+	private void tryCreateTableIfNotExists(int taskNumber, JdbcDdlOptions jdbcDdlOptions) throws SQLException, ClassNotFoundException {
+		// only try to create table from one TM, simply choice the minimum TM as leader
+		// after FLIP-27,this can be refactor better.
+		final boolean isCreateTableLeader = taskNumber == 0 ? true : false;
+		final JDBCOptions jdbcOptions = jdbcDdlOptions.getJdbcOptions();
+		final int maxRetries = jdbcDdlOptions.getMaxRetry();
+
+		int retry = 1;
+		if (isCreateTableLeader) {
+			while (retry <= maxRetries) {
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection(); Statement st = conn.createStatement()) {
+					st.execute(jdbcDdlOptions.getCreateTableStatement());
+					break;
+				} catch (ClassNotFoundException e) {
+					throw new RuntimeException(
+						String.format("JDBC Driver class %s can not found.", jdbcOptions.getDriverName()), e);
+				} catch (SQLException e) {
+					if (retry == maxRetries) {
+						throw new RuntimeException(
+							String.format("Create table %s failed after %s attempts.", jdbcOptions.getTableName(), retry), e);
+					}
+					try {
+						Thread.sleep(1000 * retry);
+					} catch (InterruptedException ine) {
+						Thread.currentThread().interrupt();
+						throw new RuntimeException("Interrupted while retry.", ine);
+					}
+				}
+			}
+			retry++;

Review comment:
       `retry++` should in `while` block.

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -105,6 +119,66 @@ public void open(int taskNumber, int numTasks) throws IOException {
 		}
 	}
 
+	private void tryCreateTableIfNotExists(int taskNumber, JdbcDdlOptions jdbcDdlOptions) throws SQLException, ClassNotFoundException {
+		// only try to create table from one TM, simply choice the minimum TM as leader
+		// after FLIP-27,this can be refactor better.
+		final boolean isCreateTableLeader = taskNumber == 0 ? true : false;
+		final JDBCOptions jdbcOptions = jdbcDdlOptions.getJdbcOptions();
+		final int maxRetries = jdbcDdlOptions.getMaxRetry();
+
+		int retry = 1;
+		if (isCreateTableLeader) {
+			while (retry <= maxRetries) {
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection(); Statement st = conn.createStatement()) {
+					st.execute(jdbcDdlOptions.getCreateTableStatement());
+					break;
+				} catch (ClassNotFoundException e) {
+					throw new RuntimeException(
+						String.format("JDBC Driver class %s can not found.", jdbcOptions.getDriverName()), e);
+				} catch (SQLException e) {
+					if (retry == maxRetries) {
+						throw new RuntimeException(

Review comment:
       Use `IOException`.

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -105,6 +119,66 @@ public void open(int taskNumber, int numTasks) throws IOException {
 		}
 	}
 
+	private void tryCreateTableIfNotExists(int taskNumber, JdbcDdlOptions jdbcDdlOptions) throws SQLException, ClassNotFoundException {
+		// only try to create table from one TM, simply choice the minimum TM as leader
+		// after FLIP-27,this can be refactor better.
+		final boolean isCreateTableLeader = taskNumber == 0 ? true : false;
+		final JDBCOptions jdbcOptions = jdbcDdlOptions.getJdbcOptions();
+		final int maxRetries = jdbcDdlOptions.getMaxRetry();
+
+		int retry = 1;
+		if (isCreateTableLeader) {
+			while (retry <= maxRetries) {
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection(); Statement st = conn.createStatement()) {
+					st.execute(jdbcDdlOptions.getCreateTableStatement());
+					break;
+				} catch (ClassNotFoundException e) {
+					throw new RuntimeException(
+						String.format("JDBC Driver class %s can not found.", jdbcOptions.getDriverName()), e);
+				} catch (SQLException e) {
+					if (retry == maxRetries) {
+						throw new RuntimeException(
+							String.format("Create table %s failed after %s attempts.", jdbcOptions.getTableName(), retry), e);
+					}
+					try {
+						Thread.sleep(1000 * retry);
+					} catch (InterruptedException ine) {
+						Thread.currentThread().interrupt();
+						throw new RuntimeException("Interrupted while retry.", ine);
+					}
+				}
+			}
+			retry++;
+		} else {
+			// just wait leader to create table
+			while (retry <= maxRetries) {
+				try {
+					Thread.sleep(1000 * retry);
+				} catch (InterruptedException ine) {
+					Thread.currentThread().interrupt();
+					throw new RuntimeException("Interrupted while retry.", ine);
+				}
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection();) {
+					DatabaseMetaData dbMeta = conn.getMetaData();
+					ResultSet tables = dbMeta.getTables(null, null, jdbcOptions.getTableName(), null);
+					if (tables.next()) {
+						return;

Review comment:
       We can extract this method. The task who creates table also needs this method to check table existence first.

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -105,6 +119,66 @@ public void open(int taskNumber, int numTasks) throws IOException {
 		}
 	}
 
+	private void tryCreateTableIfNotExists(int taskNumber, JdbcDdlOptions jdbcDdlOptions) throws SQLException, ClassNotFoundException {
+		// only try to create table from one TM, simply choice the minimum TM as leader
+		// after FLIP-27,this can be refactor better.
+		final boolean isCreateTableLeader = taskNumber == 0 ? true : false;
+		final JDBCOptions jdbcOptions = jdbcDdlOptions.getJdbcOptions();
+		final int maxRetries = jdbcDdlOptions.getMaxRetry();
+
+		int retry = 1;
+		if (isCreateTableLeader) {
+			while (retry <= maxRetries) {
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection(); Statement st = conn.createStatement()) {
+					st.execute(jdbcDdlOptions.getCreateTableStatement());
+					break;
+				} catch (ClassNotFoundException e) {
+					throw new RuntimeException(
+						String.format("JDBC Driver class %s can not found.", jdbcOptions.getDriverName()), e);
+				} catch (SQLException e) {
+					if (retry == maxRetries) {
+						throw new RuntimeException(
+							String.format("Create table %s failed after %s attempts.", jdbcOptions.getTableName(), retry), e);
+					}
+					try {
+						Thread.sleep(1000 * retry);
+					} catch (InterruptedException ine) {
+						Thread.currentThread().interrupt();
+						throw new RuntimeException("Interrupted while retry.", ine);
+					}
+				}
+			}
+			retry++;
+		} else {
+			// just wait leader to create table
+			while (retry <= maxRetries) {
+				try {
+					Thread.sleep(1000 * retry);
+				} catch (InterruptedException ine) {
+					Thread.currentThread().interrupt();
+					throw new RuntimeException("Interrupted while retry.", ine);
+				}
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection();) {
+					DatabaseMetaData dbMeta = conn.getMetaData();
+					ResultSet tables = dbMeta.getTables(null, null, jdbcOptions.getTableName(), null);
+					if (tables.next()) {
+						return;
+					} else {
+						if (retry == maxRetries) {
+							int timeOut = 0;
+							for (int i = 1; i <= maxRetries; i++) {
+								timeOut += i * 1000;
+							}

Review comment:
       Calculate `timeout` in the while block to avoid this foreach. 

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -105,6 +119,66 @@ public void open(int taskNumber, int numTasks) throws IOException {
 		}
 	}
 
+	private void tryCreateTableIfNotExists(int taskNumber, JdbcDdlOptions jdbcDdlOptions) throws SQLException, ClassNotFoundException {
+		// only try to create table from one TM, simply choice the minimum TM as leader
+		// after FLIP-27,this can be refactor better.
+		final boolean isCreateTableLeader = taskNumber == 0 ? true : false;
+		final JDBCOptions jdbcOptions = jdbcDdlOptions.getJdbcOptions();
+		final int maxRetries = jdbcDdlOptions.getMaxRetry();
+
+		int retry = 1;
+		if (isCreateTableLeader) {
+			while (retry <= maxRetries) {
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection(); Statement st = conn.createStatement()) {
+					st.execute(jdbcDdlOptions.getCreateTableStatement());
+					break;
+				} catch (ClassNotFoundException e) {
+					throw new RuntimeException(
+						String.format("JDBC Driver class %s can not found.", jdbcOptions.getDriverName()), e);
+				} catch (SQLException e) {
+					if (retry == maxRetries) {
+						throw new RuntimeException(
+							String.format("Create table %s failed after %s attempts.", jdbcOptions.getTableName(), retry), e);
+					}
+					try {
+						Thread.sleep(1000 * retry);
+					} catch (InterruptedException ine) {
+						Thread.currentThread().interrupt();
+						throw new RuntimeException("Interrupted while retry.", ine);
+					}
+				}
+			}
+			retry++;
+		} else {
+			// just wait leader to create table
+			while (retry <= maxRetries) {
+				try {
+					Thread.sleep(1000 * retry);
+				} catch (InterruptedException ine) {
+					Thread.currentThread().interrupt();
+					throw new RuntimeException("Interrupted while retry.", ine);
+				}
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection();) {
+					DatabaseMetaData dbMeta = conn.getMetaData();
+					ResultSet tables = dbMeta.getTables(null, null, jdbcOptions.getTableName(), null);
+					if (tables.next()) {
+						return;
+					} else {
+						if (retry == maxRetries) {

Review comment:
       `retry >= maxRetries`

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -105,6 +119,66 @@ public void open(int taskNumber, int numTasks) throws IOException {
 		}
 	}
 
+	private void tryCreateTableIfNotExists(int taskNumber, JdbcDdlOptions jdbcDdlOptions) throws SQLException, ClassNotFoundException {
+		// only try to create table from one TM, simply choice the minimum TM as leader
+		// after FLIP-27,this can be refactor better.
+		final boolean isCreateTableLeader = taskNumber == 0 ? true : false;
+		final JDBCOptions jdbcOptions = jdbcDdlOptions.getJdbcOptions();
+		final int maxRetries = jdbcDdlOptions.getMaxRetry();
+
+		int retry = 1;
+		if (isCreateTableLeader) {
+			while (retry <= maxRetries) {
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection(); Statement st = conn.createStatement()) {
+					st.execute(jdbcDdlOptions.getCreateTableStatement());
+					break;
+				} catch (ClassNotFoundException e) {
+					throw new RuntimeException(
+						String.format("JDBC Driver class %s can not found.", jdbcOptions.getDriverName()), e);
+				} catch (SQLException e) {
+					if (retry == maxRetries) {
+						throw new RuntimeException(
+							String.format("Create table %s failed after %s attempts.", jdbcOptions.getTableName(), retry), e);
+					}
+					try {
+						Thread.sleep(1000 * retry);
+					} catch (InterruptedException ine) {
+						Thread.currentThread().interrupt();
+						throw new RuntimeException("Interrupted while retry.", ine);
+					}
+				}
+			}
+			retry++;
+		} else {
+			// just wait leader to create table
+			while (retry <= maxRetries) {
+				try {
+					Thread.sleep(1000 * retry);
+				} catch (InterruptedException ine) {
+					Thread.currentThread().interrupt();
+					throw new RuntimeException("Interrupted while retry.", ine);

Review comment:
       Move sleep after table exsitence check.

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -105,6 +119,66 @@ public void open(int taskNumber, int numTasks) throws IOException {
 		}
 	}
 
+	private void tryCreateTableIfNotExists(int taskNumber, JdbcDdlOptions jdbcDdlOptions) throws SQLException, ClassNotFoundException {
+		// only try to create table from one TM, simply choice the minimum TM as leader
+		// after FLIP-27,this can be refactor better.
+		final boolean isCreateTableLeader = taskNumber == 0 ? true : false;
+		final JDBCOptions jdbcOptions = jdbcDdlOptions.getJdbcOptions();
+		final int maxRetries = jdbcDdlOptions.getMaxRetry();
+
+		int retry = 1;
+		if (isCreateTableLeader) {
+			while (retry <= maxRetries) {
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection(); Statement st = conn.createStatement()) {
+					st.execute(jdbcDdlOptions.getCreateTableStatement());
+					break;
+				} catch (ClassNotFoundException e) {
+					throw new RuntimeException(
+						String.format("JDBC Driver class %s can not found.", jdbcOptions.getDriverName()), e);
+				} catch (SQLException e) {
+					if (retry == maxRetries) {

Review comment:
       `retry >= maxRetries`

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -105,6 +119,66 @@ public void open(int taskNumber, int numTasks) throws IOException {
 		}
 	}
 
+	private void tryCreateTableIfNotExists(int taskNumber, JdbcDdlOptions jdbcDdlOptions) throws SQLException, ClassNotFoundException {
+		// only try to create table from one TM, simply choice the minimum TM as leader
+		// after FLIP-27,this can be refactor better.
+		final boolean isCreateTableLeader = taskNumber == 0 ? true : false;
+		final JDBCOptions jdbcOptions = jdbcDdlOptions.getJdbcOptions();
+		final int maxRetries = jdbcDdlOptions.getMaxRetry();
+
+		int retry = 1;
+		if (isCreateTableLeader) {
+			while (retry <= maxRetries) {
+				try (Connection conn = new SimpleJdbcConnectionProvider(jdbcOptions).getConnection(); Statement st = conn.createStatement()) {

Review comment:
       Reuse the `connection` initialized in `AbstractJdbcOutputFormat#establishConnection`?

##########
File path: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
##########
@@ -252,4 +282,100 @@ public void testBatchSink() throws Exception {
 				Row.of("Bob", 1)
 		}, DB_URL, OUTPUT_TABLE3, new String[]{"NAME", "SCORE"});
 	}
+
+	@Test
+	public void testCreateTableIfNotExists() throws Exception {

Review comment:
       `testAutoCreateTable`

##########
File path: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
##########
@@ -252,4 +282,100 @@ public void testBatchSink() throws Exception {
 				Row.of("Bob", 1)
 		}, DB_URL, OUTPUT_TABLE3, new String[]{"NAME", "SCORE"});
 	}
+
+	@Test
+	public void testCreateTableIfNotExists() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+		env.getConfig().setParallelism(1);
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+		Table t = tEnv.fromDataStream(getFullTypeDataStream(env), "c0, c1, c2, c3, c4, c5, c6, c7, c8, c9");
+
+		tEnv.registerTable("T", t);
+
+		tEnv.sqlUpdate(
+			"CREATE TABLE upsertSink (" +
+				"  c0 BOOLEAN," +
+				"  c1 INTEGER," +
+				"  c2 BIGINT," +
+				"  c3 FLOAT," +
+				"  c4 DOUBLE," +
+				"  c5 DECIMAL," +
+				"  c6 STRING," +
+				"  c7 DATE," +
+				"  c8 TIME(0)," +
+				"  c9 TIMESTAMP(3)" +
+				") WITH (" +
+				"  'connector.type'='jdbc'," +
+				"  'connector.url'='" + DB_URL + "'," +
+				"  'connector.table'='" + OUTPUT_TABLE4 + "'," +
+				"  'connector.write.auto-create-table'='true'" +
+				")");
+
+		tEnv.sqlUpdate("INSERT INTO upsertSink " +
+			" SELECT c0, c1, c2, c3, c4, max(c5), max(c6), max(c7), max(c8), max(c9) " +
+			" FROM T WHERE c1 > 2" +
+			" group by c0, c1, c2, c3, c4");
+		tEnv.execute("testCreateTableIfNotExists");
+		check(new Row[] {
+				//JDBC treats SQL FLOAT/DOUBLE as java.lang.Double, convert float value to double before compare.
+				Row.of(false, 3, 1002L, Double.valueOf(0.1f), 12.26d, new BigDecimal("5.330000000000000000"), "To be or not to be 3",
+					Date.valueOf("2020-03-10"), Time.valueOf("00:00:03"), Timestamp.valueOf("2020-03-10 00:00:00.021")),
+				Row.of(false, 4, 1003L, Double.valueOf(0.11f), 12.27d, new BigDecimal("5.440000000000000000"), "To be or not to be 4",
+					Date.valueOf("2020-03-10"), Time.valueOf("00:00:04"), Timestamp.valueOf("2020-03-10 00:00:00.021"))},
+			DB_URL, OUTPUT_TABLE4, new String[]{"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"});
+
+		clearAutoCreatedTable();
+	}
+
+	@Test
+	public void testCreateTableIfNotExistsUseBlinkPlanner() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+		env.getConfig().setParallelism(1);
+		EnvironmentSettings setting = EnvironmentSettings.newInstance()
+			.useBlinkPlanner()
+			.inStreamingMode()
+			.build();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, setting);
+		Table t = tEnv.fromDataStream(getFullTypeDataStream(env), "c0, c1, c2, c3, c4, c5, c6, c7, c8, c9");
+
+		tEnv.registerTable("T", t);
+
+		tEnv.sqlUpdate(

Review comment:
       Add a `TODO` above this: we should test PRIMARY KEY is auto created once PRIMARY KEY syntax is supported (FLINK-17030). 

##########
File path: docs/dev/table/connect.md
##########
@@ -1325,6 +1325,10 @@ CREATE TABLE MyUserTable (
 
   -- optional, max retry times if writing records to database failed
   'connector.write.max-retries' = '3'
+  
+  -- optional, create jdbc table when writing data to database if the table does not exist.

Review comment:
       ```suggestion
     -- optional, automatically create jdbc table in database when writing data to database if the table does not exist. Default is false.
   ```

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
##########
@@ -146,6 +153,42 @@ default String getSelectFromStatement(String tableName, String[] selectFields, S
 				.map(f -> quoteIdentifier(f) + "=?")
 				.collect(Collectors.joining(" AND "));
 		return "SELECT " + selectExpressions + " FROM " +
-				quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+			quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+	}
+
+	/**
+	 * Get create table statement.
+	 */
+	default String getCreateTableStatement(String tableName, TableSchema schema, String[] primaryKeyFields) {
+		List<String> expressions = new ArrayList<>();
+		for (TableColumn column : schema.getTableColumns()) {
+			expressions.add(quoteIdentifier(column.getName()) + " " + getDialectTypeName(column.getType()));
+		}
+		if (primaryKeyFields != null && primaryKeyFields.length > 0) {
+			String primaryKey = "PRIMARY KEY" + String.format("(%s)", StringUtils.join(
+				Arrays.stream(primaryKeyFields)
+					.map(name -> quoteIdentifier(name))
+					.collect(Collectors.toList()),
+				","));
+			expressions.add(primaryKey);
+		}
+
+		return String.format("CREATE TABLE IF NOT EXISTS %s (%s)", quoteIdentifier(tableName),
+			StringUtils.join(expressions, ","));
+	}
+
+	/**
+	 * Get dialect type name from {@link DataType}.
+	 */
+	default String getDialectTypeName(DataType dataType) {

Review comment:
       I don't think we should have a default type name mapping. Almost every database vendors have totally different type names. We should have the mapping clearly defined in dialect implementations. 

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
##########
@@ -211,6 +240,28 @@ public int minTimestampPrecision() {
 					LogicalTypeRoot.SYMBOL,
 					LogicalTypeRoot.UNRESOLVED);
 		}
+
+		@Override
+		public String getDialectTypeName(DataType dataType) {
+			switch (dataType.getLogicalType().getTypeRoot()) {
+				case VARCHAR:
+					final int len = ((VarCharType) dataType.getLogicalType()).getLength();
+					return String.format("VARCHAR(%d)", len > MAX_VARCHAR_LEN ? MAX_VARCHAR_LEN : len);
+				case DECIMAL:
+					if (dataType.getLogicalType() instanceof DecimalType) {
+						return dataType.toString();
+					}
+					// for legacy type
+					return "DECIMAL(" + MAX_DECIMAL_PRECISION + ", 18)";
+				// derby does not support time/timestamp type with precision
+				case TIME_WITHOUT_TIME_ZONE:
+					return "TIME";
+				case TIMESTAMP_WITHOUT_TIME_ZONE:
+					return "TIMESTAMP";
+				default:
+					return dataType.toString();

Review comment:
       Let's list the full mapping, not fallback to the default toString, this is error-prone. 
   You can have a look at `PostgresCatalog#fromJDBCType`.

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcDdlOptions.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.Serializable;
+
+import static org.apache.flink.api.java.io.jdbc.AbstractJdbcOutputFormat.DEFAULT_CREATE_TABLE_IF_NOT_EXISTS;
+
+/**
+ * Options for using JDBC DDL, like create table.
+ */
+public class JdbcDdlOptions implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	private final JDBCOptions jdbcOptions;
+	private final int maxRetry;
+	private final boolean createTableIfNotExists;

Review comment:
       Rename this variable to `autoCreateTable`. I think `IfNotExists` is redundant because create table is of course executed only when it is not exist.

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcDdlOptions.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.Serializable;
+
+import static org.apache.flink.api.java.io.jdbc.AbstractJdbcOutputFormat.DEFAULT_CREATE_TABLE_IF_NOT_EXISTS;
+
+/**
+ * Options for using JDBC DDL, like create table.
+ */
+public class JdbcDdlOptions implements Serializable {

Review comment:
       I want to avoid introducing too many jdbc options. Can we merge this one into `JdbcDmlOptions` ?  Because in my understanding, this is a behavior of `INSERT INTO`. 

##########
File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
##########
@@ -146,6 +153,42 @@ default String getSelectFromStatement(String tableName, String[] selectFields, S
 				.map(f -> quoteIdentifier(f) + "=?")
 				.collect(Collectors.joining(" AND "));
 		return "SELECT " + selectExpressions + " FROM " +
-				quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+			quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+	}
+
+	/**
+	 * Get create table statement.
+	 */
+	default String getCreateTableStatement(String tableName, TableSchema schema, String[] primaryKeyFields) {

Review comment:
       The same to `getDialectTypeName`. My concern is that, we should throw unsupported exception if the dialect doesn't implement `autoCreateTable`, rather than an unreadable exception. 

##########
File path: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
##########
@@ -252,4 +282,100 @@ public void testBatchSink() throws Exception {
 				Row.of("Bob", 1)
 		}, DB_URL, OUTPUT_TABLE3, new String[]{"NAME", "SCORE"});
 	}
+
+	@Test
+	public void testCreateTableIfNotExists() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+		env.getConfig().setParallelism(1);
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+		Table t = tEnv.fromDataStream(getFullTypeDataStream(env), "c0, c1, c2, c3, c4, c5, c6, c7, c8, c9");
+
+		tEnv.registerTable("T", t);
+
+		tEnv.sqlUpdate(
+			"CREATE TABLE upsertSink (" +
+				"  c0 BOOLEAN," +
+				"  c1 INTEGER," +
+				"  c2 BIGINT," +
+				"  c3 FLOAT," +
+				"  c4 DOUBLE," +
+				"  c5 DECIMAL," +
+				"  c6 STRING," +
+				"  c7 DATE," +
+				"  c8 TIME(0)," +
+				"  c9 TIMESTAMP(3)" +
+				") WITH (" +
+				"  'connector.type'='jdbc'," +
+				"  'connector.url'='" + DB_URL + "'," +
+				"  'connector.table'='" + OUTPUT_TABLE4 + "'," +
+				"  'connector.write.auto-create-table'='true'" +
+				")");
+
+		tEnv.sqlUpdate("INSERT INTO upsertSink " +
+			" SELECT c0, c1, c2, c3, c4, max(c5), max(c6), max(c7), max(c8), max(c9) " +
+			" FROM T WHERE c1 > 2" +
+			" group by c0, c1, c2, c3, c4");
+		tEnv.execute("testCreateTableIfNotExists");
+		check(new Row[] {
+				//JDBC treats SQL FLOAT/DOUBLE as java.lang.Double, convert float value to double before compare.
+				Row.of(false, 3, 1002L, Double.valueOf(0.1f), 12.26d, new BigDecimal("5.330000000000000000"), "To be or not to be 3",
+					Date.valueOf("2020-03-10"), Time.valueOf("00:00:03"), Timestamp.valueOf("2020-03-10 00:00:00.021")),
+				Row.of(false, 4, 1003L, Double.valueOf(0.11f), 12.27d, new BigDecimal("5.440000000000000000"), "To be or not to be 4",
+					Date.valueOf("2020-03-10"), Time.valueOf("00:00:04"), Timestamp.valueOf("2020-03-10 00:00:00.021"))},
+			DB_URL, OUTPUT_TABLE4, new String[]{"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"});
+
+		clearAutoCreatedTable();
+	}
+
+	@Test
+	public void testCreateTableIfNotExistsUseBlinkPlanner() throws Exception {

Review comment:
       Please use `Parameterized` to test against different planners to avoid duplicate code, or just test blink planner. 

##########
File path: docs/dev/table/connect.md
##########
@@ -1325,6 +1325,10 @@ CREATE TABLE MyUserTable (
 
   -- optional, max retry times if writing records to database failed
   'connector.write.max-retries' = '3'

Review comment:
       ```suggestion
     'connector.write.max-retries' = '3',
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org