You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/08/30 14:53:07 UTC
[2/2] flink git commit: [FLINK-7556] Allow Integer.MIN_VALUE for
fetch size in JDBCInputFormat
[FLINK-7556] Allow Integer.MIN_VALUE for fetch size in JDBCInputFormat
Allow Integer.MIN_VALUE to be accepted as a parameter for setFetchSize for MySQL Driver.
The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row. After this, any result sets created with the statement will be retrieved row-by-row.
This closes #4617.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e753db84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e753db84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e753db84
Branch: refs/heads/master
Commit: e753db8411debfc573ffc330355a0f24c0afbfb5
Parents: 1b7f8bd
Author: Nycholas de Oliveira e Oliveira <ny...@gmail.com>
Authored: Tue Aug 29 14:21:03 2017 -0300
Committer: zentol <ch...@apache.org>
Committed: Wed Aug 30 16:51:32 2017 +0200
----------------------------------------------------------------------
.../apache/flink/api/java/io/jdbc/JDBCInputFormat.java | 5 +++--
.../flink/api/java/io/jdbc/JDBCInputFormatTest.java | 11 +++++++++++
2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e753db84/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index b7ac744..7d08814 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -144,7 +144,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
dbConn = DriverManager.getConnection(dbURL, username, password);
}
statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
- if (fetchSize > 0) {
+ if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
statement.setFetchSize(fetchSize);
}
} catch (SQLException se) {
@@ -390,7 +390,8 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
}
public JDBCInputFormatBuilder setFetchSize(int fetchSize) {
- Preconditions.checkArgument(fetchSize > 0, "Illegal value %s for fetchSize, has to be positive.", fetchSize);
+ Preconditions.checkArgument(fetchSize == Integer.MIN_VALUE || fetchSize > 0,
+ "Illegal value %s for fetchSize, has to be positive or Integer.MIN_VALUE.", fetchSize);
format.fetchSize = fetchSize;
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e753db84/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index f7a86e5..10e8c66 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -114,6 +114,17 @@ public class JDBCInputFormatTest extends JDBCTestBase {
}
@Test
+ public void testValidFetchSizeIntegerMin() {
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .setFetchSize(Integer.MIN_VALUE)
+ .finish();
+ }
+
+ @Test
public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)