You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/11/27 18:25:46 UTC
[nifi] branch main updated: NIFI-8046: Fix issue with
ResultSetRecordSet on DB2
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fe95013 NIFI-8046: Fix issue with ResultSetRecordSet on DB2
fe95013 is described below
commit fe950131c35756dabd677fb21b436a1f85eabced
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Nov 27 19:23:44 2020 +0100
NIFI-8046: Fix issue with ResultSetRecordSet on DB2
This closes #8046.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../serialization/record/ResultSetRecordSet.java | 22 +++++++++++++---------
.../standard/AbstractQueryDatabaseTable.java | 2 +-
2 files changed, 14 insertions(+), 10 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index e6425af..ff0d142 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -58,14 +58,17 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
this.rs = rs;
- moreRows = rs.next();
- this.schema = createSchema(rs, readerSchema);
-
- rsColumnNames = new HashSet<>();
- final ResultSetMetaData metadata = rs.getMetaData();
- for (int i = 0; i < metadata.getColumnCount(); i++) {
- rsColumnNames.add(metadata.getColumnLabel(i + 1));
+ this.rsColumnNames = new HashSet<>();
+ RecordSchema tempSchema;
+ try {
+ tempSchema = createSchema(rs, readerSchema);
+ moreRows = rs.next();
+ } catch(SQLException se) {
+ // Tried to create the schema with a ResultSet without calling next() first (probably for DB2), now try the other way around
+ moreRows = rs.next();
+ tempSchema = createSchema(rs, readerSchema);
}
+ this.schema = tempSchema;
}
@Override
@@ -91,7 +94,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
try {
if (moreRows) {
final Record record = createRecord(rs);
- moreRows = rs.next();
+ moreRows = !rs.isClosed() && rs.next();
return record;
} else {
return null;
@@ -146,7 +149,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return value;
}
- private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
+ private RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
final ResultSetMetaData metadata = rs.getMetaData();
final int numCols = metadata.getColumnCount();
final List<RecordField> fields = new ArrayList<>(numCols);
@@ -168,6 +171,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final RecordField field = new RecordField(fieldName, dataType, nullable);
fields.add(field);
+ rsColumnNames.add(metadata.getColumnLabel(column));
}
return new SimpleRecordSchema(fields);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index 1df0ae2..2310664 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -316,7 +316,7 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
fileToProcess = session.putAllAttributes(fileToProcess, attributesToAdd);
sqlWriter.updateCounters(session);
- logger.info("{} contains {} records; transferring to 'success'",
+ logger.debug("{} contains {} records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));