You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/02/17 01:58:18 UTC

[nifi] branch master updated: NIFI-6044: This closes #3314. Retain the input data's order in the CSV Reader's inferred schema

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 82f4415  NIFI-6044: This closes #3314. Retain the input data's order in the CSV Reader's inferred schema
82f4415 is described below

commit 82f44155f62b0d3f90f60cfd611c6ae12761c873
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Feb 15 20:43:46 2019 -0500

    NIFI-6044: This closes #3314. Retain the input data's order in the CSV Reader's inferred schema
    
    Signed-off-by: joewitt <jo...@apache.org>
---
 .../TestJsonInferenceSchemaRegistryService.groovy  |  1 +
 .../main/java/org/apache/nifi/csv/CSVReader.java   |  7 ++--
 .../apache/nifi/csv/CSVRecordAndFieldNames.java    | 39 ++++++++++++++++++++++
 .../java/org/apache/nifi/csv/CSVRecordSource.java  | 12 +++++--
 .../org/apache/nifi/csv/CSVSchemaInference.java    | 21 ++++++------
 .../apache/nifi/csv/TestCSVSchemaInference.java    |  7 ++++
 6 files changed, 69 insertions(+), 18 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
index c930452..19dd9a1 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
@@ -48,6 +48,7 @@ class TestJsonInferenceSchemaRegistryService {
         def schema = service.getSchema([:], json, null)
 
         Assert.assertNotNull(schema)
+        Assert.assertEquals(Arrays.asList("name", "age", "contact"), schema.getFieldNames());
         def name = schema.getField("name")
         def age  = schema.getField("age")
         def contact = schema.getField("contact")
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index af46147..1ae57b3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.csv;
 
 import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVRecord;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -32,8 +31,8 @@ import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
-import org.apache.nifi.schema.inference.SchemaInferenceEngine;
 import org.apache.nifi.schema.inference.RecordSourceFactory;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
 import org.apache.nifi.schema.inference.SchemaInferenceUtil;
 import org.apache.nifi.schema.inference.TimeValueInference;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
@@ -149,8 +148,8 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
         if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
             return new CSVHeaderSchemaStrategy(context);
         } else if (allowableValue.equalsIgnoreCase(SchemaInferenceUtil.INFER_SCHEMA.getValue())) {
-            final RecordSourceFactory<CSVRecord> sourceFactory = (var, in) -> new CSVRecordSource(in, context);
-            final SchemaInferenceEngine<CSVRecord> inference = new CSVSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
+            final RecordSourceFactory<CSVRecordAndFieldNames> sourceFactory = (var, in) -> new CSVRecordSource(in, context);
+            final SchemaInferenceEngine<CSVRecordAndFieldNames> inference = new CSVSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
             return new InferSchemaAccessStrategy<>(sourceFactory, inference, getLogger());
         }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordAndFieldNames.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordAndFieldNames.java
new file mode 100644
index 0000000..b4413de
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordAndFieldNames.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVRecord;
+
+import java.util.List;
+
+public class CSVRecordAndFieldNames {
+    private final CSVRecord record;
+    private final List<String> fieldNames;
+
+    public CSVRecordAndFieldNames(final CSVRecord record, final List<String> fieldNames) {
+        this.record = record;
+        this.fieldNames = fieldNames;
+    }
+
+    public CSVRecord getRecord() {
+        return record;
+    }
+
+    public List<String> getFieldNames() {
+        return fieldNames;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java
index ab4362a..20a8407 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java
@@ -29,10 +29,14 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 
-public class CSVRecordSource implements RecordSource<CSVRecord> {
+public class CSVRecordSource implements RecordSource<CSVRecordAndFieldNames> {
     private final Iterator<CSVRecord> csvRecordIterator;
+    private final List<String> fieldNames;
 
     public CSVRecordSource(final InputStream in, final PropertyContext context) throws IOException {
         final String charset = context.getProperty(CSVUtils.CHARSET).getValue();
@@ -46,14 +50,16 @@ public class CSVRecordSource implements RecordSource<CSVRecord> {
 
         final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader().withTrim();
         final CSVParser csvParser = new CSVParser(reader, csvFormat);
+        fieldNames = Collections.unmodifiableList(new ArrayList<>(csvParser.getHeaderMap().keySet()));
+
         csvRecordIterator = csvParser.iterator();
     }
 
     @Override
-    public CSVRecord next() {
+    public CSVRecordAndFieldNames next() {
         if (csvRecordIterator.hasNext()) {
             final CSVRecord record = csvRecordIterator.next();
-            return record;
+            return new CSVRecordAndFieldNames(record, fieldNames);
         }
 
         return null;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
index b81ddc0..10b18a9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
@@ -19,8 +19,8 @@ package org.apache.nifi.csv;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.nifi.schema.inference.FieldTypeInference;
-import org.apache.nifi.schema.inference.SchemaInferenceEngine;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
 import org.apache.nifi.schema.inference.TimeValueInference;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
@@ -35,7 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecord> {
+public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFieldNames> {
 
     private final TimeValueInference timeValueInference;
 
@@ -45,30 +45,29 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecord> {
 
 
     @Override
-    public RecordSchema inferSchema(final RecordSource<CSVRecord> recordSource) throws IOException {
+    public RecordSchema inferSchema(final RecordSource<CSVRecordAndFieldNames> recordSource) throws IOException {
         final Map<String, FieldTypeInference> typeMap = new LinkedHashMap<>();
         while (true) {
-            final CSVRecord rawRecord = recordSource.next();
-            if (rawRecord == null) {
+            final CSVRecordAndFieldNames recordAndFieldNames = recordSource.next();
+            if (recordAndFieldNames == null) {
                 break;
             }
 
-            inferSchema(rawRecord, typeMap);
+            inferSchema(recordAndFieldNames, typeMap);
         }
 
         return createSchema(typeMap);
     }
 
 
-    private void inferSchema(final CSVRecord csvRecord, final Map<String, FieldTypeInference> typeMap) {
-        final Map<String, String> values = csvRecord.toMap();
-        for (final Map.Entry<String, String> entry : values.entrySet()) {
-            final String value = entry.getValue();
+    private void inferSchema(final CSVRecordAndFieldNames recordAndFieldNames, final Map<String, FieldTypeInference> typeMap) {
+        final CSVRecord csvRecord = recordAndFieldNames.getRecord();
+        for (final String fieldName : recordAndFieldNames.getFieldNames()) {
+            final String value = csvRecord.get(fieldName);
             if (value == null) {
                 return;
             }
 
-            final String fieldName = entry.getKey();
             final FieldTypeInference typeInference = typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference());
             final String trimmed = trim(value);
             final DataType dataType = getDataType(trimmed);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
index ce801d6..51d3eb2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
@@ -32,7 +32,9 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -74,5 +76,10 @@ public class TestCSVSchemaInference {
 
         assertSame(RecordFieldType.INT, schema.getDataType("parentIds").get().getFieldType());
         assertSame(RecordFieldType.STRING, schema.getDataType("numeric string").get().getFieldType());
+
+        final List<String> fieldNames = schema.getFieldNames();
+        assertEquals(Arrays.asList("eventId", "eventOrdinal", "eventType", "timestampMillis", "timestamp", "eventDate", "eventTime", "maybeTime", "maybeDate", "durationMillis", "lineageStart",
+            "componentId", "componentType", "componentName", "processGroupId", "processGroupName", "entityId", "entityType", "entitySize", "previousEntitySize", "updatedAttributes", "actorHostname",
+                "contentURI", "previousContentURI", "parentIds", "childIds", "platform", "application", "extra field", "numeric string"), fieldNames);
     }
 }