You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/05/03 17:14:01 UTC
nifi git commit: NIFI-3787: Addressed NPE and ensure that if
validation fails due to RuntimeException,
that it gets logged. Also clarified documentation for Json Reader services
Repository: nifi
Updated Branches:
refs/heads/master b7c15c360 -> 9b177fbcb
NIFI-3787: Addressed NPE and ensure that if validation fails due to RuntimeException, that it gets logged. Also clarified documentation for Json Reader services
This closes #1742.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9b177fbc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9b177fbc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9b177fbc
Branch: refs/heads/master
Commit: 9b177fbcbabe67b9ebb1d0a5d3fafe7b67aebda1
Parents: b7c15c3
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed May 3 12:33:30 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Wed May 3 13:13:43 2017 -0400
----------------------------------------------------------------------
.../nifi/serialization/record/MapRecord.java | 2 +-
.../record/util/DataTypeUtils.java | 23 +++++++++
.../nifi/serialization/DateTimeUtils.java | 13 +++--
.../controller/AbstractConfiguredComponent.java | 4 ++
.../java/org/apache/nifi/avro/AvroReader.java | 22 +++++++++
.../apache/nifi/avro/AvroRecordSetWriter.java | 2 -
.../avro/EmbeddedAvroSchemaAccessStrategy.java | 51 ++++++++++++++++++++
.../nifi/csv/CSVHeaderSchemaStrategy.java | 9 ++++
.../java/org/apache/nifi/csv/CSVReader.java | 12 ++++-
.../org/apache/nifi/csv/CSVRecordSetWriter.java | 3 +-
.../java/org/apache/nifi/grok/GrokReader.java | 37 +++++++++-----
.../nifi/json/JsonPathRowRecordReader.java | 6 +--
.../nifi/json/JsonTreeRowRecordReader.java | 6 +--
.../additionalDetails.html | 3 +-
.../additionalDetails.html | 6 ++-
15 files changed, 169 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index f9a22fc..8d98c33 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -192,7 +192,7 @@ public class MapRecord implements Record {
@Override
public Date getAsDate(final String fieldName, final String format) {
- return DataTypeUtils.toDate(getValue(fieldName), DataTypeUtils.getDateFormat(format), fieldName);
+ return DataTypeUtils.toDate(getValue(fieldName), format == null ? null : DataTypeUtils.getDateFormat(format), fieldName);
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index f59ac0d..9f1e463 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -32,6 +32,7 @@ import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -343,6 +344,10 @@ public class DataTypeUtils {
return getDateFormat(format).format((java.util.Date) value);
}
+ if (value instanceof Object[]) {
+ return Arrays.toString((Object[]) value);
+ }
+
return value.toString();
}
@@ -396,6 +401,10 @@ public class DataTypeUtils {
}
if (value instanceof String) {
+ if (format == null) {
+ return isInteger((String) value);
+ }
+
try {
getDateFormat(format).parse((String) value);
return true;
@@ -407,6 +416,20 @@ public class DataTypeUtils {
return false;
}
+ private static boolean isInteger(final String value) {
+ if (value == null || value.isEmpty()) {
+ return false;
+ }
+
+ for (int i = 0; i < value.length(); i++) {
+ if (!Character.isDigit(value.charAt(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
public static Time toTime(final Object value, final DateFormat format, final String fieldName) {
if (value == null) {
return null;
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
index efc3e4e..6336943 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
@@ -23,7 +23,9 @@ public class DateTimeUtils {
public static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
.name("Date Format")
.description("Specifies the format to use when reading/writing Date fields. "
- + "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
+ + "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). "
+ + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+ + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).")
.expressionLanguageSupported(false)
.addValidator(new SimpleDateFormatValidator())
.required(false)
@@ -32,7 +34,9 @@ public class DateTimeUtils {
public static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
.name("Time Format")
.description("Specifies the format to use when reading/writing Time fields. "
- + "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
+ + "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). "
+ + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
+ + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
.expressionLanguageSupported(false)
.addValidator(new SimpleDateFormatValidator())
.required(false)
@@ -41,7 +45,10 @@ public class DateTimeUtils {
public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
.name("Timestamp Format")
.description("Specifies the format to use when reading/writing Timestamp fields. "
- + "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
+ + "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). "
+ + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+ + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
+ + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).")
.expressionLanguageSupported(false)
.addValidator(new SimpleDateFormatValidator())
.required(false)
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index 1156987..50a71e7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -30,6 +30,8 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.ArrayList;
@@ -49,6 +51,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractConfiguredComponent.class);
private final String id;
private final ValidationContextFactory validationContextFactory;
@@ -463,6 +466,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
}
}
} catch (final Throwable t) {
+ logger.error("Failed to perform validation of " + this, t);
results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build());
} finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index 205fec6..8451f5a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -29,10 +29,14 @@ import org.apache.avro.Schema;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@@ -63,6 +67,24 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
}
@Override
+ protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ConfigurationContext context) {
+ if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) {
+ return new EmbeddedAvroSchemaAccessStrategy();
+ } else {
+ return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+ }
+ }
+
+ @Override
+ protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ValidationContext context) {
+ if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) {
+ return new EmbeddedAvroSchemaAccessStrategy();
+ } else {
+ return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+ }
+ }
+
+ @Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
final String schemaAccessStrategy = getConfigurationContext().getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 91ca6cf..121d1ec 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -65,8 +65,6 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
try {
final RecordSchema recordSchema = getSchema(flowFile, in);
-
-
final Schema avroSchema;
try {
if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
new file mode 100644
index 0000000..eba9429
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy {
+ private final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
+
+ @Override
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
+ final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader<GenericRecord>());
+ final Schema avroSchema = dataFileStream.getSchema();
+ final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
+ return recordSchema;
+ }
+
+ @Override
+ public Set<SchemaField> getSuppliedSchemaFields() {
+ return schemaFields;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
index a57f10b..f2b1cbb 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.io.input.BOMInputStream;
+import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
@@ -47,8 +48,16 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
this.context = context;
}
+ public CSVHeaderSchemaStrategy(final ValidationContext context) {
+ this.context = null;
+ }
+
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+ if (this.context == null) {
+ throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema");
+ }
+
try {
final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader();
try (final Reader reader = new InputStreamReader(new BOMInputStream(contentStream));
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index 9fe4136..dbea3dc 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -52,8 +53,6 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
"The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the "
+ "column names in the header and assuming that all columns are of type String.");
- private volatile SchemaAccessStrategy headerDerivedSchemaStrategy;
-
private volatile CSVFormat csvFormat;
private volatile String dateFormat;
private volatile String timeFormat;
@@ -106,6 +105,15 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
}
@Override
+ protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
+ if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
+ return new CSVHeaderSchemaStrategy(context);
+ }
+
+ return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+ }
+
+ @Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
allowableValues.add(headerDerivedAllowableValue);
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
index 15f1e1f..0b29f09 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -38,7 +38,8 @@ import org.apache.nifi.serialization.record.RecordSchema;
@Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"})
@CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written "
- + "will be the column names. All subsequent lines will be the values corresponding to those columns.")
+ + "will be the column names (unless the 'Include Header Line' property is false). All subsequent lines will be the values "
+ + "corresponding to the record fields.")
public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {
private volatile CSVFormat csvFormat;
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 6c8deab..a874632 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -33,6 +33,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -180,24 +181,36 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
- return new SchemaAccessStrategy() {
- private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
-
- @Override
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
- return recordSchema;
- }
+ return createAccessStrategy();
+ } else {
+ return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+ }
+ }
- @Override
- public Set<SchemaField> getSuppliedSchemaFields() {
- return schemaFields;
- }
- };
+ @Override
+ protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
+ if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
+ return createAccessStrategy();
} else {
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
}
}
+ private SchemaAccessStrategy createAccessStrategy() {
+ return new SchemaAccessStrategy() {
+ private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
+
+ @Override
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+ return recordSchema;
+ }
+
+ @Override
+ public Set<SchemaField> getSuppliedSchemaFields() {
+ return schemaFields;
+ }
+ };
+ }
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index 2eebfe2..7d2a2c1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -62,9 +62,9 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
throws MalformedRecordException, IOException {
super(in, logger);
- this.dateFormat = DataTypeUtils.getDateFormat(dateFormat);
- this.timeFormat = DataTypeUtils.getDateFormat(timeFormat);
- this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat);
+ this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
+ this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
+ this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
this.schema = schema;
this.jsonPaths = jsonPaths;
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index f15e04e..ee5bff9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -56,9 +56,9 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
super(in, logger);
this.schema = schema;
- this.dateFormat = DataTypeUtils.getDateFormat(dateFormat);
- this.timeFormat = DataTypeUtils.getDateFormat(timeFormat);
- this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat);
+ this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
+ this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
+ this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html
index aceb54d..14d40f6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html
@@ -81,7 +81,8 @@
<li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
<li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
<li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
- property (Date Format, Time Format, Timestamp Format property).</li>
+ property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
+ representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
</ul>
<p>
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
index 90980d1..c08e720 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
@@ -25,7 +25,8 @@
The JsonTreeReader Controller Service reads a JSON Object and creates a Record object for the entire
JSON Object tree. The Controller Service must be configured with a Schema that describes the structure
of the JSON data. If any field exists in the JSON that is not in the schema, that field will be skipped.
- If the schema contains a field for which no JSON field exists, a null value will be used in the Record.
+ If the schema contains a field for which no JSON field exists, a null value will be used in the Record
+ (or the default value defined in the schema, if applicable).
</p>
<p>
@@ -66,7 +67,8 @@
<li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
<li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
<li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
- property (Date Format, Time Format, Timestamp Format property).</li>
+ property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
+ representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
</ul>
<p>