You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2018/01/09 03:09:53 UTC

nifi git commit: NIFI-4749: This closes #2386. Pass the record reader's schema along to the ResultSetRecordSet so that it is able to resolve the schema for Record fields

Repository: nifi
Updated Branches:
  refs/heads/master a7f1eb89c -> 953e922d3


NIFI-4749: This closes #2386. Pass the record reader's schema along to the ResultSetRecordSet so that it is able to resolve the schema for Record fields

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/953e922d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/953e922d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/953e922d

Branch: refs/heads/master
Commit: 953e922d324d37c3771f46a2f371026814d92282
Parents: a7f1eb8
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jan 8 16:18:34 2018 -0500
Committer: joewitt <jo...@apache.org>
Committed: Mon Jan 8 19:47:18 2018 -0700

----------------------------------------------------------------------
 .../record/ResultSetRecordSet.java              | 32 +++++++++++++++-----
 .../nifi/processors/standard/QueryRecord.java   | 18 ++++++++++-
 2 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/953e922d/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 571bf77..a0f44e5 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -45,10 +46,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
     private final Set<String> rsColumnNames;
     private boolean moreRows;
 
-    public ResultSetRecordSet(final ResultSet rs) throws SQLException {
+    public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
         this.rs = rs;
         moreRows = rs.next();
-        this.schema = createSchema(rs);
+        this.schema = createSchema(rs, readerSchema);
 
         rsColumnNames = new HashSet<>();
         final ResultSetMetaData metadata = rs.getMetaData();
@@ -118,7 +119,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
         return value;
     }
 
-    private static RecordSchema createSchema(final ResultSet rs) throws SQLException {
+    private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
         final ResultSetMetaData metadata = rs.getMetaData();
         final int numCols = metadata.getColumnCount();
         final List<RecordField> fields = new ArrayList<>(numCols);
@@ -127,7 +128,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             final int column = i + 1;
             final int sqlType = metadata.getColumnType(column);
 
-            final DataType dataType = getDataType(sqlType, rs, column);
+            final DataType dataType = getDataType(sqlType, rs, column, readerSchema);
             final String fieldName = metadata.getColumnLabel(column);
 
             final int nullableFlag = metadata.isNullable(column);
@@ -145,7 +146,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
         return new SimpleRecordSchema(fields);
     }
 
-    private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException {
+    private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex, final RecordSchema readerSchema) throws SQLException {
         switch (sqlType) {
             case Types.ARRAY:
                 // The JDBC API does not allow us to know what the base type of an array is through the metadata.
@@ -168,12 +169,18 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             case Types.LONGVARBINARY:
             case Types.VARBINARY:
                 return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
-            case Types.OTHER:
+            case Types.OTHER: {
                 // If we have no records to inspect, we can't really know its schema so we simply use the default data type.
                 if (rs.isAfterLast()) {
                     return RecordFieldType.RECORD.getDataType();
                 }
 
+                final String columnName = rs.getMetaData().getColumnName(columnIndex);
+                Optional<DataType> dataType = readerSchema.getDataType(columnName);
+                if (dataType.isPresent()) {
+                    return dataType.get();
+                }
+
                 final Object obj = rs.getObject(columnIndex);
                 if (obj == null || !(obj instanceof Record)) {
                     final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
@@ -188,8 +195,16 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 final Record record = (Record) obj;
                 final RecordSchema recordSchema = record.getSchema();
                 return RecordFieldType.RECORD.getRecordDataType(recordSchema);
-            default:
+            }
+            default: {
+                final String columnName = rs.getMetaData().getColumnName(columnIndex);
+                Optional<DataType> dataType = readerSchema.getDataType(columnName);
+                if (dataType.isPresent()) {
+                    return dataType.get();
+                }
+
                 return getFieldType(sqlType).getDataType();
+            }
         }
     }
 
@@ -286,7 +301,8 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 return RecordFieldType.TIMESTAMP.getDataType();
             }
             if (valueToLookAt instanceof Record) {
-                return RecordFieldType.RECORD.getDataType();
+                final Record record = (Record) valueToLookAt;
+                return RecordFieldType.RECORD.getRecordDataType(record.getSchema());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/953e922d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 5798323..50e5dd7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -74,6 +75,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.queryrecord.FlowFileTable;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -256,6 +258,20 @@ public class QueryRecord extends AbstractProcessor {
         final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>();
         final Set<FlowFile> createdFlowFiles = new HashSet<>();
 
+        // Determine the Record Reader's schema
+        final RecordSchema readerSchema;
+        try (final InputStream rawIn = session.read(original)) {
+            final Map<String, String> originalAttributes = original.getAttributes();
+            final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, getLogger());
+            final RecordSchema inputSchema = reader.getSchema();
+
+            readerSchema = recordSetWriterFactory.getSchema(originalAttributes, inputSchema);
+        } catch (final Exception e) {
+            getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
         // Determine the schema for writing the data
         final Map<String, String> originalAttributes = original.getAttributes();
         int recordsRead = 0;
@@ -294,7 +310,7 @@ public class QueryRecord extends AbstractProcessor {
                                 final RecordSchema writeSchema;
 
                                 try {
-                                    recordSet = new ResultSetRecordSet(rs);
+                                    recordSet = new ResultSetRecordSet(rs, readerSchema);
                                     final RecordSchema resultSetSchema = recordSet.getSchema();
                                     writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema);
                                 } catch (final SQLException | SchemaNotFoundException e) {