You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2018/01/09 03:09:53 UTC
nifi git commit: NIFI-4749: This closes #2386. Pass the record
reader's schema along to the ResultSetRecordSet so that it is able to resolve
the schema for Record fields
Repository: nifi
Updated Branches:
refs/heads/master a7f1eb89c -> 953e922d3
NIFI-4749: This closes #2386. Pass the record reader's schema along to the ResultSetRecordSet so that it is able to resolve the schema for Record fields
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/953e922d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/953e922d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/953e922d
Branch: refs/heads/master
Commit: 953e922d324d37c3771f46a2f371026814d92282
Parents: a7f1eb8
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jan 8 16:18:34 2018 -0500
Committer: joewitt <jo...@apache.org>
Committed: Mon Jan 8 19:47:18 2018 -0700
----------------------------------------------------------------------
.../record/ResultSetRecordSet.java | 32 +++++++++++++++-----
.../nifi/processors/standard/QueryRecord.java | 18 ++++++++++-
2 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/953e922d/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 571bf77..a0f44e5 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -45,10 +46,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
private final Set<String> rsColumnNames;
private boolean moreRows;
- public ResultSetRecordSet(final ResultSet rs) throws SQLException {
+ public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
this.rs = rs;
moreRows = rs.next();
- this.schema = createSchema(rs);
+ this.schema = createSchema(rs, readerSchema);
rsColumnNames = new HashSet<>();
final ResultSetMetaData metadata = rs.getMetaData();
@@ -118,7 +119,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return value;
}
- private static RecordSchema createSchema(final ResultSet rs) throws SQLException {
+ private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
final ResultSetMetaData metadata = rs.getMetaData();
final int numCols = metadata.getColumnCount();
final List<RecordField> fields = new ArrayList<>(numCols);
@@ -127,7 +128,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final int column = i + 1;
final int sqlType = metadata.getColumnType(column);
- final DataType dataType = getDataType(sqlType, rs, column);
+ final DataType dataType = getDataType(sqlType, rs, column, readerSchema);
final String fieldName = metadata.getColumnLabel(column);
final int nullableFlag = metadata.isNullable(column);
@@ -145,7 +146,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return new SimpleRecordSchema(fields);
}
- private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException {
+ private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex, final RecordSchema readerSchema) throws SQLException {
switch (sqlType) {
case Types.ARRAY:
// The JDBC API does not allow us to know what the base type of an array is through the metadata.
@@ -168,12 +169,18 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
case Types.LONGVARBINARY:
case Types.VARBINARY:
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
- case Types.OTHER:
+ case Types.OTHER: {
// If we have no records to inspect, we can't really know its schema so we simply use the default data type.
if (rs.isAfterLast()) {
return RecordFieldType.RECORD.getDataType();
}
+ final String columnName = rs.getMetaData().getColumnName(columnIndex);
+ Optional<DataType> dataType = readerSchema.getDataType(columnName);
+ if (dataType.isPresent()) {
+ return dataType.get();
+ }
+
final Object obj = rs.getObject(columnIndex);
if (obj == null || !(obj instanceof Record)) {
final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
@@ -188,8 +195,16 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final Record record = (Record) obj;
final RecordSchema recordSchema = record.getSchema();
return RecordFieldType.RECORD.getRecordDataType(recordSchema);
- default:
+ }
+ default: {
+ final String columnName = rs.getMetaData().getColumnName(columnIndex);
+ Optional<DataType> dataType = readerSchema.getDataType(columnName);
+ if (dataType.isPresent()) {
+ return dataType.get();
+ }
+
return getFieldType(sqlType).getDataType();
+ }
}
}
@@ -286,7 +301,8 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return RecordFieldType.TIMESTAMP.getDataType();
}
if (valueToLookAt instanceof Record) {
- return RecordFieldType.RECORD.getDataType();
+ final Record record = (Record) valueToLookAt;
+ return RecordFieldType.RECORD.getRecordDataType(record.getSchema());
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/953e922d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 5798323..50e5dd7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import java.io.Closeable;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -74,6 +75,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.queryrecord.FlowFileTable;
import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -256,6 +258,20 @@ public class QueryRecord extends AbstractProcessor {
final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>();
final Set<FlowFile> createdFlowFiles = new HashSet<>();
+ // Determine the Record Reader's schema
+ final RecordSchema readerSchema;
+ try (final InputStream rawIn = session.read(original)) {
+ final Map<String, String> originalAttributes = original.getAttributes();
+ final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, getLogger());
+ final RecordSchema inputSchema = reader.getSchema();
+
+ readerSchema = recordSetWriterFactory.getSchema(originalAttributes, inputSchema);
+ } catch (final Exception e) {
+ getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
+ session.transfer(original, REL_FAILURE);
+ return;
+ }
+
// Determine the schema for writing the data
final Map<String, String> originalAttributes = original.getAttributes();
int recordsRead = 0;
@@ -294,7 +310,7 @@ public class QueryRecord extends AbstractProcessor {
final RecordSchema writeSchema;
try {
- recordSet = new ResultSetRecordSet(rs);
+ recordSet = new ResultSetRecordSet(rs, readerSchema);
final RecordSchema resultSetSchema = recordSet.getSchema();
writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema);
} catch (final SQLException | SchemaNotFoundException e) {