You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/13 12:26:12 UTC
[flink] branch release-1.9 updated: [FLINK-13490][jdbc] Fix return
null in JDBCUtils::getFieldFromResultSet
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 82667a8 [FLINK-13490][jdbc] Fix return null in JDBCUtils::getFieldFromResultSet
82667a8 is described below
commit 82667a816f31d20da93eb4464d01fc9fe2fb1bc5
Author: TsReaper <ts...@gmail.com>
AuthorDate: Tue Jul 30 08:41:08 2019 +0800
[FLINK-13490][jdbc] Fix return null in JDBCUtils::getFieldFromResultSet
---
.../apache/flink/api/java/io/jdbc/JDBCUtils.java | 59 ++++++++++++++--------
.../api/java/io/jdbc/JDBCLookupFunctionITCase.java | 33 +++++++++---
2 files changed, 63 insertions(+), 29 deletions(-)
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
index 9bc948a..d50b82e 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
@@ -152,53 +152,64 @@ public class JDBCUtils {
}
public static Object getFieldFromResultSet(int index, int type, ResultSet set) throws SQLException {
- if (set.wasNull()) {
- return null;
- }
-
+ Object ret;
switch (type) {
case java.sql.Types.NULL:
- return null;
+ ret = null;
+ break;
case java.sql.Types.BOOLEAN:
case java.sql.Types.BIT:
- return set.getBoolean(index + 1);
+ ret = set.getBoolean(index + 1);
+ break;
case java.sql.Types.CHAR:
case java.sql.Types.NCHAR:
case java.sql.Types.VARCHAR:
case java.sql.Types.LONGVARCHAR:
case java.sql.Types.LONGNVARCHAR:
- return set.getString(index + 1);
+ ret = set.getString(index + 1);
+ break;
case java.sql.Types.TINYINT:
- return set.getByte(index + 1);
+ ret = set.getByte(index + 1);
+ break;
case java.sql.Types.SMALLINT:
- return set.getShort(index + 1);
+ ret = set.getShort(index + 1);
+ break;
case java.sql.Types.INTEGER:
- return set.getInt(index + 1);
+ ret = set.getInt(index + 1);
+ break;
case java.sql.Types.BIGINT:
- return set.getLong(index + 1);
+ ret = set.getLong(index + 1);
+ break;
case java.sql.Types.REAL:
- return set.getFloat(index + 1);
+ ret = set.getFloat(index + 1);
+ break;
case java.sql.Types.FLOAT:
case java.sql.Types.DOUBLE:
- return set.getDouble(index + 1);
+ ret = set.getDouble(index + 1);
+ break;
case java.sql.Types.DECIMAL:
case java.sql.Types.NUMERIC:
- return set.getBigDecimal(index + 1);
+ ret = set.getBigDecimal(index + 1);
+ break;
case java.sql.Types.DATE:
- return set.getDate(index + 1);
+ ret = set.getDate(index + 1);
+ break;
case java.sql.Types.TIME:
- return set.getTime(index + 1);
+ ret = set.getTime(index + 1);
+ break;
case java.sql.Types.TIMESTAMP:
- return set.getTimestamp(index + 1);
+ ret = set.getTimestamp(index + 1);
+ break;
case java.sql.Types.BINARY:
case java.sql.Types.VARBINARY:
case java.sql.Types.LONGVARBINARY:
- return set.getBytes(index + 1);
+ ret = set.getBytes(index + 1);
+ break;
default:
- Object value = set.getObject(index + 1);
+ ret = set.getObject(index + 1);
LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to get its value: {}.",
- type, index + 1, value);
- return value;
+ type, index + 1, ret);
+ break;
// case java.sql.Types.SQLXML
// case java.sql.Types.ARRAY:
@@ -213,5 +224,11 @@ public class JDBCUtils {
// case java.sql.Types.ROWID:
// case java.sql.Types.STRUC
}
+
+ if (set.wasNull()) {
+ return null;
+ } else {
+ return ret;
+ }
}
}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
index 3d66001..16a0eeb 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
@@ -84,20 +84,37 @@ public class JDBCLookupFunctionITCase extends AbstractTestBase {
Object[][] data = new Object[][] {
new Object[] {1, 1, "11-c1-v1", "11-c2-v1"},
new Object[] {1, 1, "11-c1-v2", "11-c2-v2"},
- new Object[] {2, 3, "23-c1", "23-c2"},
+ new Object[] {2, 3, null, "23-c2"},
new Object[] {2, 5, "25-c1", "25-c2"},
new Object[] {3, 8, "38-c1", "38-c2"}
};
+ boolean[] surroundedByQuotes = new boolean[] {
+ false, false, true, true
+ };
+
StringBuilder sqlQueryBuilder = new StringBuilder(
"INSERT INTO " + LOOKUP_TABLE + " (id1, id2, comment1, comment2) VALUES ");
for (int i = 0; i < data.length; i++) {
- sqlQueryBuilder.append("(")
- .append(data[i][0]).append(",")
- .append(data[i][1]).append(",'")
- .append(data[i][2]).append("','")
- .append(data[i][3]).append("')");
+ sqlQueryBuilder.append("(");
+ for (int j = 0; j < data[i].length; j++) {
+ if (data[i][j] == null) {
+ sqlQueryBuilder.append("null");
+ } else {
+ if (surroundedByQuotes[j]) {
+ sqlQueryBuilder.append("'");
+ }
+ sqlQueryBuilder.append(data[i][j]);
+ if (surroundedByQuotes[j]) {
+ sqlQueryBuilder.append("'");
+ }
+ }
+ if (j < data[i].length - 1) {
+ sqlQueryBuilder.append(", ");
+ }
+ }
+ sqlQueryBuilder.append(")");
if (i < data.length - 1) {
- sqlQueryBuilder.append(",");
+ sqlQueryBuilder.append(", ");
}
}
stat.execute(sqlQueryBuilder.toString());
@@ -160,7 +177,7 @@ public class JDBCLookupFunctionITCase extends AbstractTestBase {
expected.add("1,1,11-c1-v1,11-c2-v1");
expected.add("1,1,11-c1-v2,11-c2-v2");
expected.add("1,1,11-c1-v2,11-c2-v2");
- expected.add("2,3,23-c1,23-c2");
+ expected.add("2,3,null,23-c2");
expected.add("2,5,25-c1,25-c2");
expected.add("3,8,38-c1,38-c2");