You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/02/17 01:58:18 UTC
[nifi] branch master updated: NIFI-6044: This closes #3314. Retain
the input data's order in the CSV Reader's inferred schema
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 82f4415 NIFI-6044: This closes #3314. Retain the input data's order in the CSV Reader's inferred schema
82f4415 is described below
commit 82f44155f62b0d3f90f60cfd611c6ae12761c873
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Feb 15 20:43:46 2019 -0500
NIFI-6044: This closes #3314. Retain the input data's order in the CSV Reader's inferred schema
Signed-off-by: joewitt <jo...@apache.org>
---
.../TestJsonInferenceSchemaRegistryService.groovy | 1 +
.../main/java/org/apache/nifi/csv/CSVReader.java | 7 ++--
.../apache/nifi/csv/CSVRecordAndFieldNames.java | 39 ++++++++++++++++++++++
.../java/org/apache/nifi/csv/CSVRecordSource.java | 12 +++++--
.../org/apache/nifi/csv/CSVSchemaInference.java | 21 ++++++------
.../apache/nifi/csv/TestCSVSchemaInference.java | 7 ++++
6 files changed, 69 insertions(+), 18 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
index c930452..19dd9a1 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
@@ -48,6 +48,7 @@ class TestJsonInferenceSchemaRegistryService {
def schema = service.getSchema([:], json, null)
Assert.assertNotNull(schema)
+ Assert.assertEquals(Arrays.asList("name", "age", "contact"), schema.getFieldNames());
def name = schema.getField("name")
def age = schema.getField("age")
def contact = schema.getField("contact")
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index af46147..1ae57b3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -18,7 +18,6 @@
package org.apache.nifi.csv;
import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVRecord;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -32,8 +31,8 @@ import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
-import org.apache.nifi.schema.inference.SchemaInferenceEngine;
import org.apache.nifi.schema.inference.RecordSourceFactory;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
@@ -149,8 +148,8 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
return new CSVHeaderSchemaStrategy(context);
} else if (allowableValue.equalsIgnoreCase(SchemaInferenceUtil.INFER_SCHEMA.getValue())) {
- final RecordSourceFactory<CSVRecord> sourceFactory = (var, in) -> new CSVRecordSource(in, context);
- final SchemaInferenceEngine<CSVRecord> inference = new CSVSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
+ final RecordSourceFactory<CSVRecordAndFieldNames> sourceFactory = (var, in) -> new CSVRecordSource(in, context);
+ final SchemaInferenceEngine<CSVRecordAndFieldNames> inference = new CSVSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
return new InferSchemaAccessStrategy<>(sourceFactory, inference, getLogger());
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordAndFieldNames.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordAndFieldNames.java
new file mode 100644
index 0000000..b4413de
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordAndFieldNames.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVRecord;
+
+import java.util.List;
+
+public class CSVRecordAndFieldNames {
+ private final CSVRecord record;
+ private final List<String> fieldNames;
+
+ public CSVRecordAndFieldNames(final CSVRecord record, final List<String> fieldNames) {
+ this.record = record;
+ this.fieldNames = fieldNames;
+ }
+
+ public CSVRecord getRecord() {
+ return record;
+ }
+
+ public List<String> getFieldNames() {
+ return fieldNames;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java
index ab4362a..20a8407 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java
@@ -29,10 +29,14 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
-public class CSVRecordSource implements RecordSource<CSVRecord> {
+public class CSVRecordSource implements RecordSource<CSVRecordAndFieldNames> {
private final Iterator<CSVRecord> csvRecordIterator;
+ private final List<String> fieldNames;
public CSVRecordSource(final InputStream in, final PropertyContext context) throws IOException {
final String charset = context.getProperty(CSVUtils.CHARSET).getValue();
@@ -46,14 +50,16 @@ public class CSVRecordSource implements RecordSource<CSVRecord> {
final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader().withTrim();
final CSVParser csvParser = new CSVParser(reader, csvFormat);
+ fieldNames = Collections.unmodifiableList(new ArrayList<>(csvParser.getHeaderMap().keySet()));
+
csvRecordIterator = csvParser.iterator();
}
@Override
- public CSVRecord next() {
+ public CSVRecordAndFieldNames next() {
if (csvRecordIterator.hasNext()) {
final CSVRecord record = csvRecordIterator.next();
- return record;
+ return new CSVRecordAndFieldNames(record, fieldNames);
}
return null;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
index b81ddc0..10b18a9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
@@ -19,8 +19,8 @@ package org.apache.nifi.csv;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.nifi.schema.inference.FieldTypeInference;
-import org.apache.nifi.schema.inference.SchemaInferenceEngine;
import org.apache.nifi.schema.inference.RecordSource;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
@@ -35,7 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecord> {
+public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFieldNames> {
private final TimeValueInference timeValueInference;
@@ -45,30 +45,29 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecord> {
@Override
- public RecordSchema inferSchema(final RecordSource<CSVRecord> recordSource) throws IOException {
+ public RecordSchema inferSchema(final RecordSource<CSVRecordAndFieldNames> recordSource) throws IOException {
final Map<String, FieldTypeInference> typeMap = new LinkedHashMap<>();
while (true) {
- final CSVRecord rawRecord = recordSource.next();
- if (rawRecord == null) {
+ final CSVRecordAndFieldNames recordAndFieldNames = recordSource.next();
+ if (recordAndFieldNames == null) {
break;
}
- inferSchema(rawRecord, typeMap);
+ inferSchema(recordAndFieldNames, typeMap);
}
return createSchema(typeMap);
}
- private void inferSchema(final CSVRecord csvRecord, final Map<String, FieldTypeInference> typeMap) {
- final Map<String, String> values = csvRecord.toMap();
- for (final Map.Entry<String, String> entry : values.entrySet()) {
- final String value = entry.getValue();
+ private void inferSchema(final CSVRecordAndFieldNames recordAndFieldNames, final Map<String, FieldTypeInference> typeMap) {
+ final CSVRecord csvRecord = recordAndFieldNames.getRecord();
+ for (final String fieldName : recordAndFieldNames.getFieldNames()) {
+ final String value = csvRecord.get(fieldName);
if (value == null) {
return;
}
- final String fieldName = entry.getKey();
final FieldTypeInference typeInference = typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference());
final String trimmed = trim(value);
final DataType dataType = getDataType(trimmed);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
index ce801d6..51d3eb2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
@@ -32,7 +32,9 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -74,5 +76,10 @@ public class TestCSVSchemaInference {
assertSame(RecordFieldType.INT, schema.getDataType("parentIds").get().getFieldType());
assertSame(RecordFieldType.STRING, schema.getDataType("numeric string").get().getFieldType());
+
+ final List<String> fieldNames = schema.getFieldNames();
+ assertEquals(Arrays.asList("eventId", "eventOrdinal", "eventType", "timestampMillis", "timestamp", "eventDate", "eventTime", "maybeTime", "maybeDate", "durationMillis", "lineageStart",
+ "componentId", "componentType", "componentName", "processGroupId", "processGroupName", "entityId", "entityType", "entitySize", "previousEntitySize", "updatedAttributes", "actorHostname",
+ "contentURI", "previousContentURI", "parentIds", "childIds", "platform", "application", "extra field", "numeric string"), fieldNames);
}
}