You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2021/02/25 17:48:07 UTC

[nifi] branch support/nifi-1.13 updated: NIFI-8259: Infer CSV field types as strings if no records are present

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch support/nifi-1.13
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.13 by this push:
     new bb77aed  NIFI-8259: Infer CSV field types as strings if no records are present
bb77aed is described below

commit bb77aedac2c6a7446b48d52e43a6dd0e692ff2df
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Thu Feb 25 11:30:33 2021 -0500

    NIFI-8259: Infer CSV field types as strings if no records are present
---
 .../nifi/processors/standard/TestQueryRecord.java  | 31 ++++++++++++++++++++++
 .../java/org/apache/nifi/csv/CSVRecordSource.java  |  4 +++
 .../org/apache/nifi/csv/CSVSchemaInference.java    | 10 ++++++-
 3 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index a2c2b19..a1b1b23 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -17,9 +17,12 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.csv.CSVReader;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -931,6 +934,34 @@ public class TestQueryRecord {
         out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
     }
 
+    @Test
+    public void testNoRecordsInput() throws InitializationException, IOException, SQLException {
+        TestRunner runner = getRunner();
+
+        CSVReader csvReader = new CSVReader();
+        runner.addControllerService("csv-reader", csvReader);
+        runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
+
+        final MockRecordWriter writer = new MockRecordWriter("\"name\",\"age\"");
+
+        runner.addControllerService("csv-reader", csvReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(csvReader);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(REL_NAME, "select name from FLOWFILE WHERE age > 23");
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "csv-reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(QueryRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "true");
+
+        runner.enqueue("name,age\n");
+        runner.run();
+        runner.assertTransferCount(REL_NAME, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        System.out.println(new String(out.toByteArray()));
+        out.assertContentEquals("\"name\",\"age\"\n");
+    }
+
 
     @Test
     public void testTransformCalc() throws InitializationException, IOException, SQLException {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java
index c27b579..69ef1d6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java
@@ -65,4 +65,8 @@ public class CSVRecordSource implements RecordSource<CSVRecordAndFieldNames> {
 
         return null;
     }
+
+    public List<String> getFieldNames() {
+        return fieldNames;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
index bc25744..5d195f3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
@@ -50,12 +50,20 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFie
         while (true) {
             final CSVRecordAndFieldNames recordAndFieldNames = recordSource.next();
             if (recordAndFieldNames == null) {
+                // If there are no records, assume the datatypes of all fields are strings
+                if (typeMap.isEmpty()) {
+                    if (recordSource instanceof CSVRecordSource) {
+                        CSVRecordSource csvRecordSource = (CSVRecordSource) recordSource;
+                        for (String fieldName : csvRecordSource.getFieldNames()) {
+                            typeMap.put(fieldName, new FieldTypeInference());
+                        }
+                    }
+                }
                 break;
             }
 
             inferSchema(recordAndFieldNames, typeMap);
         }
-
         return createSchema(typeMap);
     }