You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/08/30 14:53:07 UTC

[2/2] flink git commit: [FLINK-7556] Allow Integer.MIN_VALUE for fetch size in JDBCInputFormat

[FLINK-7556] Allow Integer.MIN_VALUE for fetch size in JDBCInputFormat

Allow Integer.MIN_VALUE to be accepted as a parameter for setFetchSize for MySQL Driver.

The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row. After this, any result sets created with the statement will be retrieved row-by-row.

This closes #4617.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e753db84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e753db84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e753db84

Branch: refs/heads/master
Commit: e753db8411debfc573ffc330355a0f24c0afbfb5
Parents: 1b7f8bd
Author: Nycholas de Oliveira e Oliveira <ny...@gmail.com>
Authored: Tue Aug 29 14:21:03 2017 -0300
Committer: zentol <ch...@apache.org>
Committed: Wed Aug 30 16:51:32 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/io/jdbc/JDBCInputFormat.java   |  5 +++--
 .../flink/api/java/io/jdbc/JDBCInputFormatTest.java      | 11 +++++++++++
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e753db84/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index b7ac744..7d08814 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -144,7 +144,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 				dbConn = DriverManager.getConnection(dbURL, username, password);
 			}
 			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
-			if (fetchSize > 0) {
+			if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
 				statement.setFetchSize(fetchSize);
 			}
 		} catch (SQLException se) {
@@ -390,7 +390,8 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 		}
 
 		public JDBCInputFormatBuilder setFetchSize(int fetchSize) {
-			Preconditions.checkArgument(fetchSize > 0, "Illegal value %s for fetchSize, has to be positive.", fetchSize);
+			Preconditions.checkArgument(fetchSize == Integer.MIN_VALUE || fetchSize > 0,
+				"Illegal value %s for fetchSize, has to be positive or Integer.MIN_VALUE.", fetchSize);
 			format.fetchSize = fetchSize;
 			return this;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e753db84/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index f7a86e5..10e8c66 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -114,6 +114,17 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	}
 
 	@Test
+	public void testValidFetchSizeIntegerMin() {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(SELECT_ALL_BOOKS)
+			.setRowTypeInfo(ROW_TYPE_INFO)
+			.setFetchSize(Integer.MIN_VALUE)
+			.finish();
+	}
+
+	@Test
 	public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 			.setDrivername(DRIVER_CLASS)