You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/03/10 16:16:59 UTC

[flink] 01/10: [FLINK-15782][connectors/jdbc] rename package-private JDBC classes Motivation: Future classes for JDBC XA and JDBC DML make uppercase convention inconvenient

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dae17509743a46c8a30fb0e3def3b7e3a51ccba5
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Jan 24 20:01:12 2020 +0100

    [FLINK-15782][connectors/jdbc] rename package-private JDBC classes
    Motivation:
    Future classes for JDBC XA and JDBC DML
    make uppercase convention inconvenient
---
 ...utFormat.java => AbstractJdbcOutputFormat.java} | 12 +++-------
 .../api/java/io/jdbc/JDBCAppendTableSink.java      |  8 +++----
 .../java/io/jdbc/JDBCAppendTableSinkBuilder.java   |  4 ++--
 .../flink/api/java/io/jdbc/JDBCLookupFunction.java |  4 ++--
 .../flink/api/java/io/jdbc/JDBCLookupOptions.java  |  2 +-
 .../flink/api/java/io/jdbc/JDBCOutputFormat.java   |  2 +-
 .../flink/api/java/io/jdbc/JDBCTableSource.java    |  2 +-
 .../api/java/io/jdbc/JDBCUpsertTableSink.java      | 26 +++++++++++-----------
 ...JDBCSinkFunction.java => JdbcSinkFunction.java} |  4 ++--
 .../jdbc/{JDBCTypeUtil.java => JdbcTypeUtil.java}  |  4 ++--
 ...tputFormat.java => JdbcUpsertOutputFormat.java} | 14 ++++++------
 ...nkFunction.java => JdbcUpsertSinkFunction.java} |  6 ++---
 .../api/java/io/jdbc/JDBCAppendTableSinkTest.java  |  4 ++--
 .../java/io/jdbc/JDBCUpsertTableSinkITCase.java    |  2 +-
 ...JDBCTypeUtilTest.java => JdbcTypeUtilTest.java} |  4 ++--
 ...atTest.java => JdbcUpsertOutputFormatTest.java} |  8 +++----
 16 files changed, 50 insertions(+), 56 deletions(-)

diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJdbcOutputFormat.java
similarity index 89%
rename from flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJDBCOutputFormat.java
rename to flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJdbcOutputFormat.java
index 451baff..647c968 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJDBCOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/AbstractJdbcOutputFormat.java
@@ -30,19 +30,13 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 
-/**
- * OutputFormat to write Rows into a JDBC database.
- *
- * @see Row
- * @see DriverManager
- */
-public abstract class AbstractJDBCOutputFormat<T> extends RichOutputFormat<T> {
+abstract class AbstractJdbcOutputFormat<T> extends RichOutputFormat<T> {
 
 	private static final long serialVersionUID = 1L;
 	static final int DEFAULT_FLUSH_MAX_SIZE = 5000;
 	static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0;
 
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCOutputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcOutputFormat.class);
 
 	private final String username;
 	private final String password;
@@ -51,7 +45,7 @@ public abstract class AbstractJDBCOutputFormat<T> extends RichOutputFormat<T> {
 
 	protected transient Connection connection;
 
-	public AbstractJDBCOutputFormat(String username, String password, String drivername, String dbURL) {
+	public AbstractJdbcOutputFormat(String username, String password, String drivername, String dbURL) {
 		this.username = username;
 		this.password = password;
 		this.drivername = drivername;
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
index b5bc34c..4fa8c56 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
@@ -63,7 +63,7 @@ public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTab
 	@Override
 	public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
 		return dataStream
-			.addSink(new JDBCSinkFunction(outputFormat))
+			.addSink(new JdbcSinkFunction(outputFormat))
 			.setParallelism(dataStream.getParallelism())
 			.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
 	}
@@ -93,16 +93,16 @@ public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTab
 		int[] types = outputFormat.getTypesArray();
 
 		String sinkSchema =
-			String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
+			String.join(", ", IntStream.of(types).mapToObj(JdbcTypeUtil::getTypeName).collect(Collectors.toList()));
 		String tableSchema =
-			String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
+			String.join(", ", Stream.of(fieldTypes).map(JdbcTypeUtil::getTypeName).collect(Collectors.toList()));
 		String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " +
 			"Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema);
 
 		Preconditions.checkArgument(fieldTypes.length == types.length, msg);
 		for (int i = 0; i < types.length; ++i) {
 			Preconditions.checkArgument(
-				JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
+				JdbcTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
 				msg);
 		}
 
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
index 023c624..ec966ac 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.java.io.jdbc;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
+import static org.apache.flink.api.java.io.jdbc.AbstractJdbcOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
 
 /**
  * A builder to configure and build the JDBCAppendTableSink.
@@ -100,7 +100,7 @@ public class JDBCAppendTableSinkBuilder {
 	public JDBCAppendTableSinkBuilder setParameterTypes(TypeInformation<?>... types) {
 		int[] ty = new int[types.length];
 		for (int i = 0; i < types.length; ++i) {
-			ty[i] = JDBCTypeUtil.typeInformationToSqlType(types[i]);
+			ty[i] = JdbcTypeUtil.typeInformationToSqlType(types[i]);
 		}
 		this.parameterTypes = ty;
 		return this;
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
index 80d6127..4cfa498 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
@@ -99,8 +99,8 @@ public class JDBCLookupFunction extends TableFunction<Row> {
 		this.cacheMaxSize = lookupOptions.getCacheMaxSize();
 		this.cacheExpireMs = lookupOptions.getCacheExpireMs();
 		this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
-		this.keySqlTypes = Arrays.stream(keyTypes).mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
-		this.outputSqlTypes = Arrays.stream(fieldTypes).mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
+		this.keySqlTypes = Arrays.stream(keyTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
+		this.outputSqlTypes = Arrays.stream(fieldTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
 		this.query = options.getDialect().getSelectFromStatement(
 				options.getTableName(), fieldNames, keyNames);
 	}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java
index 6137324..6a35e8e 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.java.io.jdbc;
 import java.io.Serializable;
 import java.util.Objects;
 
-import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.DEFAULT_MAX_RETRY_TIMES;
+import static org.apache.flink.api.java.io.jdbc.JdbcUpsertOutputFormat.DEFAULT_MAX_RETRY_TIMES;
 
 /**
  * Options for the JDBC lookup.
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index 0893d7f..4b202ae 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -37,7 +37,7 @@ import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
  * @see Row
  * @see DriverManager
  */
-public class JDBCOutputFormat extends AbstractJDBCOutputFormat<Row> {
+public class JDBCOutputFormat extends AbstractJdbcOutputFormat<Row> {
 
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
index bbd5561..60339e2 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
@@ -37,7 +37,7 @@ import org.apache.flink.types.Row;
 import java.util.Arrays;
 import java.util.Objects;
 
-import static org.apache.flink.api.java.io.jdbc.JDBCTypeUtil.normalizeTableSchema;
+import static org.apache.flink.api.java.io.jdbc.JdbcTypeUtil.normalizeTableSchema;
 import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
index 77dac7b..5ba385d 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
@@ -36,10 +36,10 @@ import org.apache.flink.types.Row;
 import java.util.Arrays;
 import java.util.Objects;
 
-import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_INTERVAL_MILLS;
-import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
-import static org.apache.flink.api.java.io.jdbc.JDBCTypeUtil.normalizeTableSchema;
-import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.DEFAULT_MAX_RETRY_TIMES;
+import static org.apache.flink.api.java.io.jdbc.AbstractJdbcOutputFormat.DEFAULT_FLUSH_INTERVAL_MILLS;
+import static org.apache.flink.api.java.io.jdbc.AbstractJdbcOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTypeUtil.normalizeTableSchema;
+import static org.apache.flink.api.java.io.jdbc.JdbcUpsertOutputFormat.DEFAULT_MAX_RETRY_TIMES;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -57,11 +57,11 @@ public class JDBCUpsertTableSink implements UpsertStreamTableSink<Row> {
 	private boolean isAppendOnly;
 
 	private JDBCUpsertTableSink(
-		TableSchema schema,
-		JDBCOptions options,
-		int flushMaxSize,
-		long flushIntervalMills,
-		int maxRetryTime) {
+			TableSchema schema,
+			JDBCOptions options,
+			int flushMaxSize,
+			long flushIntervalMills,
+			int maxRetryTime) {
 		this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);
 		this.options = options;
 		this.flushMaxSize = flushMaxSize;
@@ -69,16 +69,16 @@ public class JDBCUpsertTableSink implements UpsertStreamTableSink<Row> {
 		this.maxRetryTime = maxRetryTime;
 	}
 
-	private JDBCUpsertOutputFormat newFormat() {
+	private JdbcUpsertOutputFormat newFormat() {
 		if (!isAppendOnly && (keyFields == null || keyFields.length == 0)) {
 			throw new UnsupportedOperationException("JDBCUpsertTableSink can not support ");
 		}
 
 		// sql types
 		int[] jdbcSqlTypes = Arrays.stream(schema.getFieldTypes())
-				.mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
+				.mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
 
-		return JDBCUpsertOutputFormat.builder()
+		return JdbcUpsertOutputFormat.builder()
 			.setOptions(options)
 			.setFieldNames(schema.getFieldNames())
 			.setFlushMaxSize(flushMaxSize)
@@ -92,7 +92,7 @@ public class JDBCUpsertTableSink implements UpsertStreamTableSink<Row> {
 	@Override
 	public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
 		return dataStream
-				.addSink(new JDBCUpsertSinkFunction(newFormat()))
+				.addSink(new JdbcUpsertSinkFunction(newFormat()))
 				.setParallelism(dataStream.getParallelism())
 				.name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames()));
 	}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcSinkFunction.java
similarity index 94%
rename from flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
rename to flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcSinkFunction.java
index d2fdef6..da3f611 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcSinkFunction.java
@@ -26,10 +26,10 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.types.Row;
 
-class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
+class JdbcSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
 	final JDBCOutputFormat outputFormat;
 
-	JDBCSinkFunction(JDBCOutputFormat outputFormat) {
+	JdbcSinkFunction(JDBCOutputFormat outputFormat) {
 		this.outputFormat = outputFormat;
 	}
 
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcTypeUtil.java
similarity index 99%
rename from flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
rename to flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcTypeUtil.java
index db5b56f..d3b2bb9 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcTypeUtil.java
@@ -45,7 +45,7 @@ import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
 
-class JDBCTypeUtil {
+class JdbcTypeUtil {
 	private static final Map<TypeInformation<?>, Integer> TYPE_MAPPING;
 	private static final Map<Integer, String> SQL_TYPE_NAMES;
 
@@ -84,7 +84,7 @@ class JDBCTypeUtil {
 		SQL_TYPE_NAMES = Collections.unmodifiableMap(names);
 	}
 
-	private JDBCTypeUtil() {
+	private JdbcTypeUtil() {
 	}
 
 	static int typeInformationToSqlType(TypeInformation<?> type) {
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcUpsertOutputFormat.java
similarity index 95%
rename from flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
rename to flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcUpsertOutputFormat.java
index 1736c28..48767d4 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcUpsertOutputFormat.java
@@ -41,11 +41,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * An upsert OutputFormat for JDBC.
  */
-public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Boolean, Row>> {
+class JdbcUpsertOutputFormat extends AbstractJdbcOutputFormat<Tuple2<Boolean, Row>> {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(JDBCUpsertOutputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(JdbcUpsertOutputFormat.class);
 
 	static final int DEFAULT_MAX_RETRY_TIMES = 3;
 
@@ -68,7 +68,7 @@ public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Bool
 	private transient ScheduledFuture scheduledFuture;
 	private transient volatile Exception flushException;
 
-	public JDBCUpsertOutputFormat(
+	public JdbcUpsertOutputFormat(
 			JDBCOptions options,
 			String[] fieldNames,
 			String[] keyFields,
@@ -117,7 +117,7 @@ public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Bool
 			this.scheduler = Executors.newScheduledThreadPool(
 					1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
 			this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
-				synchronized (JDBCUpsertOutputFormat.this) {
+				synchronized (JdbcUpsertOutputFormat.this) {
 					if (closed) {
 						return;
 					}
@@ -212,7 +212,7 @@ public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Bool
 	}
 
 	/**
-	 * Builder for a {@link JDBCUpsertOutputFormat}.
+	 * Builder for a {@link JdbcUpsertOutputFormat}.
 	 */
 	public static class Builder {
 		private JDBCOptions options;
@@ -285,10 +285,10 @@ public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Bool
 		 *
 		 * @return Configured JDBCUpsertOutputFormat
 		 */
-		public JDBCUpsertOutputFormat build() {
+		public JdbcUpsertOutputFormat build() {
 			checkNotNull(options, "No options supplied.");
 			checkNotNull(fieldNames, "No fieldNames supplied.");
-			return new JDBCUpsertOutputFormat(
+			return new JdbcUpsertOutputFormat(
 				options, fieldNames, keyFields, fieldTypes, flushMaxSize, flushIntervalMills, maxRetryTimes);
 		}
 	}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertSinkFunction.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcUpsertSinkFunction.java
similarity index 92%
rename from flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertSinkFunction.java
rename to flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcUpsertSinkFunction.java
index 3586008..ef668fb 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertSinkFunction.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JdbcUpsertSinkFunction.java
@@ -27,10 +27,10 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.types.Row;
 
-class JDBCUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> implements CheckpointedFunction {
-	private final JDBCUpsertOutputFormat outputFormat;
+class JdbcUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> implements CheckpointedFunction {
+	private final JdbcUpsertOutputFormat outputFormat;
 
-	JDBCUpsertSinkFunction(JDBCUpsertOutputFormat outputFormat) {
+	JdbcUpsertSinkFunction(JdbcUpsertOutputFormat outputFormat) {
 		this.outputFormat = outputFormat;
 	}
 
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
index b3f2b24..f76ca62 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
@@ -71,9 +71,9 @@ public class JDBCAppendTableSinkTest {
 				.getStreamGraph(StreamExecutionEnvironment.DEFAULT_JOB_NAME, false)
 				.getStreamNode(sinkId)
 				.getOperator();
-		assertTrue(planSink.getUserFunction() instanceof JDBCSinkFunction);
+		assertTrue(planSink.getUserFunction() instanceof JdbcSinkFunction);
 
-		JDBCSinkFunction sinkFunction = (JDBCSinkFunction) planSink.getUserFunction();
+		JdbcSinkFunction sinkFunction = (JdbcSinkFunction) planSink.getUserFunction();
 		assertSame(sink.getOutputFormat(), sinkFunction.outputFormat);
 	}
 
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
index 240c45d..901138e 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
@@ -42,7 +42,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.api.java.io.jdbc.JDBCTestBase.DRIVER_CLASS;
-import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatTest.check;
+import static org.apache.flink.api.java.io.jdbc.JdbcUpsertOutputFormatTest.check;
 
 /**
  * IT case for {@link JDBCUpsertTableSink}.
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTypeUtilTest.java
similarity index 94%
rename from flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java
rename to flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTypeUtilTest.java
index 790be78a..6bb17d1 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTypeUtilTest.java
@@ -26,14 +26,14 @@ import org.junit.Test;
 
 import java.sql.Types;
 
-import static org.apache.flink.api.java.io.jdbc.JDBCTypeUtil.typeInformationToSqlType;
+import static org.apache.flink.api.java.io.jdbc.JdbcTypeUtil.typeInformationToSqlType;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
  * Testing the type conversions from Flink to SQL types.
  */
-public class JDBCTypeUtilTest {
+public class JdbcTypeUtilTest {
 
 	@Test
 	public void testTypeConversions() {
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcUpsertOutputFormatTest.java
similarity index 95%
rename from flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormatTest.java
rename to flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcUpsertOutputFormatTest.java
index 225ab1a..b0ff204 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcUpsertOutputFormatTest.java
@@ -43,11 +43,11 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.mockito.Mockito.doReturn;
 
 /**
- * Tests for the {@link JDBCUpsertOutputFormat}.
+ * Tests for the {@link JdbcUpsertOutputFormat}.
  */
-public class JDBCUpsertOutputFormatTest extends JDBCTestBase {
+public class JdbcUpsertOutputFormatTest extends JDBCTestBase {
 
-	private JDBCUpsertOutputFormat format;
+	private JdbcUpsertOutputFormat format;
 	private String[] fieldNames;
 	private String[] keyFields;
 
@@ -59,7 +59,7 @@ public class JDBCUpsertOutputFormatTest extends JDBCTestBase {
 
 	@Test
 	public void testJDBCOutputFormat() throws Exception {
-		format = JDBCUpsertOutputFormat.builder()
+		format = JdbcUpsertOutputFormat.builder()
 				.setOptions(JDBCOptions.builder()
 						.setDBUrl(DB_URL)
 						.setTableName(OUTPUT_TABLE)