You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/11 23:33:27 UTC
[04/19] nifi git commit: NIFI-1280: Refactoring to make more generic
so that other data types can be supported;
created InputStreams to content on-demand so that multiple passes can be made
over FlowFile content if required. Created new Controller Servic
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..1848020
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,77 @@
+nifi-record-serialization-services-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2015 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+ (ASLv2) Grok
+ The following NOTICE information applies:
+ Grok
+ Copyright 2014 Anthony Corbacho, and contributors.
+
+ (ASLv2) Groovy (org.codehaus.groovy:groovy:jar:2.4.5 - http://www.groovy-lang.org)
+ The following NOTICE information applies:
+ Groovy Language
+ Copyright 2003-2015 The respective authors and developers
+ Developers and Contributors are listed in the project POM file
+ and Gradle build file
+
+ This product includes software developed by
+ The Groovy community (http://groovy.codehaus.org/).
+
+ (ASLv2) Google GSON
+ The following NOTICE information applies:
+ Copyright 2008 Google Inc.
+
+ (ASLv2) Jackson JSON processor
+ The following NOTICE information applies:
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
+ (ASLv2) JSON-SMART
+ The following NOTICE information applies:
+ Copyright 2011 JSON-SMART authors
+
+ (ASLv2) JsonPath
+ The following NOTICE information applies:
+ Copyright 2011 JsonPath authors
+
+ (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
+
+ (ASLv2) Apache Avro
+ The following NOTICE information applies:
+ Apache Avro
+ Copyright 2009-2013 The Apache Software Foundation
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
new file mode 100644
index 0000000..9b2a56c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -0,0 +1,94 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!-- Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with this
+ work for additional information regarding copyright ownership. The ASF licenses
+ this file to You under the Apache License, Version 2.0 (the "License"); you
+ may not use this file except in compliance with the License. You may obtain
+ a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+ required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services-bundle</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>net.sf.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ <version>2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>io.thekraken</groupId>
+ <artifactId>grok</artifactId>
+ <version>0.1.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/csv/extra-white-space.csv</exclude>
+ <exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
+ <exclude>src/test/resources/csv/single-bank-account.csv</exclude>
+ <exclude>src/test/resources/grok/error-with-stack-trace.log</exclude>
+ <exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude>
+ <exclude>src/test/resources/grok/nifi-log-sample.log</exclude>
+ <exclude>src/test/resources/grok/single-line-log-messages.txt</exclude>
+ <exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude>
+ <exclude>src/test/resources/json/bank-account-array.json</exclude>
+ <exclude>src/test/resources/json/json-with-unicode.json</exclude>
+ <exclude>src/test/resources/json/primitive-type-array.json</exclude>
+ <exclude>src/test/resources/json/single-bank-account.json</exclude>
+ <exclude>src/test/resources/json/single-element-nested-array.json</exclude>
+ <exclude>src/test/resources/json/single-element-nested.json</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
new file mode 100644
index 0000000..fc0c598
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+
+@Tags({"avro", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"})
+@CapabilityDescription("Parses Avro data and returns each Avro record as an separate record.")
+public class AvroReader extends AbstractControllerService implements RowRecordReaderFactory {
+
+ @Override
+ public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException {
+ return new AvroRecordReader(in);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
new file mode 100644
index 0000000..e98a5ad
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class AvroRecordReader implements RecordReader {
+ private final InputStream in;
+ private final Schema avroSchema;
+ private final DataFileStream<GenericRecord> dataFileStream;
+ private RecordSchema recordSchema;
+
+ public AvroRecordReader(final InputStream in) throws IOException, MalformedRecordException {
+ this.in = in;
+
+ dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>());
+ this.avroSchema = dataFileStream.getSchema();
+ GenericData.setStringType(this.avroSchema, StringType.String);
+ }
+
+ @Override
+ public void close() throws IOException {
+ dataFileStream.close();
+ in.close();
+ }
+
+ @Override
+ public Record nextRecord() throws IOException, MalformedRecordException {
+ if (!dataFileStream.hasNext()) {
+ return null;
+ }
+
+ GenericRecord record = null;
+ while (record == null && dataFileStream.hasNext()) {
+ record = dataFileStream.next();
+ }
+
+ final RecordSchema schema = getSchema();
+ final Map<String, Object> values = convertRecordToObjectArray(record, schema);
+ return new MapRecord(schema, values);
+ }
+
+
+ private Map<String, Object> convertRecordToObjectArray(final GenericRecord record, final RecordSchema schema) {
+ final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
+
+ for (final String fieldName : schema.getFieldNames()) {
+ final Object value = record.get(fieldName);
+
+ final Field avroField = record.getSchema().getField(fieldName);
+ if (avroField == null) {
+ values.put(fieldName, null);
+ continue;
+ }
+
+ final Schema fieldSchema = avroField.schema();
+ final DataType dataType = schema.getDataType(fieldName).orElse(null);
+ final Object converted = convertValue(value, fieldSchema, avroField.name(), dataType);
+ values.put(fieldName, converted);
+ }
+
+ return values;
+ }
+
+
+ @Override
+ public RecordSchema getSchema() throws MalformedRecordException {
+ if (recordSchema != null) {
+ return recordSchema;
+ }
+
+ recordSchema = createSchema(avroSchema);
+ return recordSchema;
+ }
+
+ private RecordSchema createSchema(final Schema avroSchema) {
+ final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
+ for (final Field field : avroSchema.getFields()) {
+ final String fieldName = field.name();
+ final DataType dataType = determineDataType(field.schema());
+ recordFields.add(new RecordField(fieldName, dataType));
+ }
+
+ final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+ return recordSchema;
+ }
+
+ private Object convertValue(final Object value, final Schema avroSchema, final String fieldName, final DataType desiredType) {
+ if (value == null) {
+ return null;
+ }
+
+ switch (avroSchema.getType()) {
+ case UNION:
+ if (value instanceof GenericData.Record) {
+ final GenericData.Record record = (GenericData.Record) value;
+ return convertValue(value, record.getSchema(), fieldName, desiredType);
+ }
+ break;
+ case RECORD:
+ final GenericData.Record record = (GenericData.Record) value;
+ final Schema recordSchema = record.getSchema();
+ final List<Field> recordFields = recordSchema.getFields();
+ final Map<String, Object> values = new HashMap<>(recordFields.size());
+ for (final Field field : recordFields) {
+ final DataType desiredFieldType = determineDataType(field.schema());
+ final Object avroFieldValue = record.get(field.name());
+ final Object fieldValue = convertValue(avroFieldValue, field.schema(), field.name(), desiredFieldType);
+ values.put(field.name(), fieldValue);
+ }
+ final RecordSchema childSchema = createSchema(recordSchema);
+ return new MapRecord(childSchema, values);
+ case BYTES:
+ final ByteBuffer bb = (ByteBuffer) value;
+ return bb.array();
+ case FIXED:
+ final GenericFixed fixed = (GenericFixed) value;
+ return fixed.bytes();
+ case ENUM:
+ return value.toString();
+ case NULL:
+ return null;
+ case STRING:
+ return value.toString();
+ case ARRAY:
+ final Array<?> array = (Array<?>) value;
+ final Object[] valueArray = new Object[array.size()];
+ for (int i = 0; i < array.size(); i++) {
+ final Schema elementSchema = avroSchema.getElementType();
+ valueArray[i] = convertValue(array.get(i), elementSchema, fieldName, determineDataType(elementSchema));
+ }
+ return valueArray;
+ case MAP:
+ final Map<?, ?> avroMap = (Map<?, ?>) value;
+ final Map<String, Object> map = new HashMap<>(avroMap.size());
+ for (final Map.Entry<?, ?> entry : avroMap.entrySet()) {
+ Object obj = entry.getValue();
+ if (obj instanceof Utf8 || obj instanceof CharSequence) {
+ obj = obj.toString();
+ }
+
+ map.put(entry.getKey().toString(), obj);
+ }
+ return map;
+ }
+
+ return value;
+ }
+
+
+ private DataType determineDataType(final Schema avroSchema) {
+ final Type avroType = avroSchema.getType();
+
+ switch (avroType) {
+ case ARRAY:
+ case BYTES:
+ case FIXED:
+ return RecordFieldType.ARRAY.getDataType();
+ case BOOLEAN:
+ return RecordFieldType.BOOLEAN.getDataType();
+ case DOUBLE:
+ return RecordFieldType.DOUBLE.getDataType();
+ case ENUM:
+ case STRING:
+ return RecordFieldType.STRING.getDataType();
+ case FLOAT:
+ return RecordFieldType.FLOAT.getDataType();
+ case INT:
+ return RecordFieldType.INT.getDataType();
+ case LONG:
+ return RecordFieldType.LONG.getDataType();
+ case RECORD: {
+ final List<Field> avroFields = avroSchema.getFields();
+ final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
+
+ for (final Field field : avroFields) {
+ final String fieldName = field.name();
+ final Schema fieldSchema = field.schema();
+ final DataType fieldType = determineDataType(fieldSchema);
+ recordFields.add(new RecordField(fieldName, fieldType));
+ }
+
+ final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+ return RecordFieldType.RECORD.getDataType(recordSchema);
+ }
+ case NULL:
+ case MAP:
+ return RecordFieldType.RECORD.getDataType();
+ case UNION: {
+ final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream()
+ .filter(s -> s.getType() != Type.NULL)
+ .collect(Collectors.toList());
+
+ if (nonNullSubSchemas.size() == 1) {
+ return determineDataType(nonNullSubSchemas.get(0));
+ }
+
+ final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
+ for (final Schema subSchema : nonNullSubSchemas) {
+ final DataType childDataType = determineDataType(subSchema);
+ possibleChildTypes.add(childDataType);
+ }
+
+ return RecordFieldType.CHOICE.getDataType(possibleChildTypes);
+ }
+ }
+
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
new file mode 100644
index 0000000..d56c716
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+@Tags({"avro", "result", "set", "writer", "serializer", "record", "row"})
+@CapabilityDescription("Writes the contents of a Database ResultSet in Binary Avro format. The data types in the Result Set must match those "
+ + "specified by the Avro Schema. No type coercion will occur, with the exception of Date, Time, and Timestamps fields because Avro does not provide "
+ + "support for these types specifically. As a result, they will be converted to String fields using the configured formats. In addition, the label"
+ + "of the column must be a valid Avro field name.")
+public class AvroRecordSetWriter extends AbstractRecordSetWriter implements RecordSetWriterFactory {
+ static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
+ .name("Avro Schema")
+ .description("The Avro Schema to use when writing out the Result Set")
+ .addValidator(new AvroSchemaValidator())
+ .expressionLanguageSupported(false)
+ .required(true)
+ .build();
+
+ private volatile Schema schema;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(SCHEMA);
+ return properties;
+ }
+
+ @OnEnabled
+ public void storePropertyValues(final ConfigurationContext context) {
+ schema = new Schema.Parser().parse(context.getProperty(SCHEMA).getValue());
+ }
+
+ @Override
+ public RecordSetWriter createWriter(final ComponentLog logger) {
+ return new WriteAvroResult(schema, getDateFormat(), getTimeFormat(), getTimestampFormat());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
new file mode 100644
index 0000000..7151348
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+public class AvroSchemaValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ try {
+ new Schema.Parser().parse(input);
+
+ return new ValidationResult.Builder()
+ .valid(true)
+ .build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(false)
+ .explanation("Not a valid Avro Schema: " + e.getMessage())
+ .build();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
new file mode 100644
index 0000000..d75d86d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+public class WriteAvroResult implements RecordSetWriter {
+ private final Schema schema;
+ private final DateFormat dateFormat;
+ private final DateFormat timeFormat;
+ private final DateFormat timestampFormat;
+
+ public WriteAvroResult(final Schema schema, final String dateFormat, final String timeFormat, final String timestampFormat) {
+ this.schema = schema;
+ this.dateFormat = new SimpleDateFormat(dateFormat);
+ this.timeFormat = new SimpleDateFormat(timeFormat);
+ this.timestampFormat = new SimpleDateFormat(timestampFormat);
+ }
+
+ @Override
+ public WriteResult write(final RecordSet rs, final OutputStream outStream) throws IOException {
+ Record record = rs.next();
+ if (record == null) {
+ return WriteResult.of(0, Collections.emptyMap());
+ }
+
+ final GenericRecord rec = new GenericData.Record(schema);
+
+ int nrOfRows = 0;
+ final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
+ dataFileWriter.create(schema, outStream);
+
+ final RecordSchema recordSchema = rs.getSchema();
+
+ do {
+ for (final String fieldName : recordSchema.getFieldNames()) {
+ final Object value = record.getValue(fieldName);
+
+ final Field field = schema.getField(fieldName);
+ if (field == null) {
+ continue;
+ }
+
+ final Object converted;
+ try {
+ converted = convert(value, field.schema(), fieldName);
+ } catch (final SQLException e) {
+ throw new IOException("Failed to write records to stream", e);
+ }
+
+ rec.put(fieldName, converted);
+ }
+
+ dataFileWriter.append(rec);
+ nrOfRows++;
+ } while ((record = rs.next()) != null);
+ }
+
+ return WriteResult.of(nrOfRows, Collections.emptyMap());
+ }
+
+ @Override
+ public WriteResult write(final Record record, final OutputStream out) throws IOException {
+ final GenericRecord rec = new GenericData.Record(schema);
+
+ final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
+ dataFileWriter.create(schema, out);
+ final RecordSchema recordSchema = record.getSchema();
+
+ for (final String fieldName : recordSchema.getFieldNames()) {
+ final Object value = record.getValue(fieldName);
+
+ final Field field = schema.getField(fieldName);
+ if (field == null) {
+ continue;
+ }
+
+ final Object converted;
+ try {
+ converted = convert(value, field.schema(), fieldName);
+ } catch (final SQLException e) {
+ throw new IOException("Failed to write records to stream", e);
+ }
+
+ rec.put(fieldName, converted);
+ }
+
+ dataFileWriter.append(rec);
+ }
+
+ return WriteResult.of(1, Collections.emptyMap());
+ }
+
+
+ private Object convert(final Object value, final Schema schema, final String fieldName) throws SQLException, IOException {
+ if (value == null) {
+ return null;
+ }
+
+ // Need to handle CLOB and BLOB before getObject() is called, due to ResultSet's maximum portability statement
+ if (value instanceof Clob) {
+ final Clob clob = (Clob) value;
+
+ long numChars = clob.length();
+ char[] buffer = new char[(int) numChars];
+ InputStream is = clob.getAsciiStream();
+ int index = 0;
+ int c = is.read();
+ while (c > 0) {
+ buffer[index++] = (char) c;
+ c = is.read();
+ }
+
+ clob.free();
+ return new String(buffer);
+ }
+
+ if (value instanceof Blob) {
+ final Blob blob = (Blob) value;
+
+ final long numChars = blob.length();
+ final byte[] buffer = new byte[(int) numChars];
+ final InputStream is = blob.getBinaryStream();
+ int index = 0;
+ int c = is.read();
+ while (c > 0) {
+ buffer[index++] = (byte) c;
+ c = is.read();
+ }
+
+ final ByteBuffer bb = ByteBuffer.wrap(buffer);
+ blob.free();
+ return bb;
+ }
+
+ if (value instanceof byte[]) {
+ // bytes requires little bit different handling
+ return ByteBuffer.wrap((byte[]) value);
+ } else if (value instanceof Byte) {
+ // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT
+ // But value is returned by JDBC as java.lang.Byte
+ // (at least H2 JDBC works this way)
+ // direct put to avro record results:
+ // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
+ return ((Byte) value).intValue();
+ } else if (value instanceof Short) {
+ //MS SQL returns TINYINT as a Java Short, which Avro doesn't understand.
+ return ((Short) value).intValue();
+ } else if (value instanceof BigDecimal) {
+ // Avro can't handle BigDecimal as a number - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38"
+ return value.toString();
+ } else if (value instanceof BigInteger) {
+ // Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that.
+ // It the SQL type is BIGINT and the precision is between 0 and 19 (inclusive); if so, the BigInteger is likely a
+ // long (and the schema says it will be), so try to get its value as a long.
+ // Otherwise, Avro can't handle BigInteger as a number - it will throw an AvroRuntimeException
+ // such as: "Unknown datum type: java.math.BigInteger: 38". In this case the schema is expecting a string.
+ final BigInteger bigInt = (BigInteger) value;
+ if (bigInt.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
+ return value.toString();
+ } else {
+ return bigInt.longValue();
+ }
+ } else if (value instanceof Boolean) {
+ return value;
+ } else if (value instanceof Map) {
+ // TODO: Revisit how we handle a lot of these cases....
+ switch (schema.getType()) {
+ case MAP:
+ return value;
+ case RECORD:
+ final GenericData.Record avroRecord = new GenericData.Record(schema);
+
+ final Record record = (Record) value;
+ for (final String recordFieldName : record.getSchema().getFieldNames()) {
+ final Object recordFieldValue = record.getValue(recordFieldName);
+
+ final Field field = schema.getField(recordFieldName);
+ if (field == null) {
+ continue;
+ }
+
+ final Object converted = convert(recordFieldValue, field.schema(), recordFieldName);
+ avroRecord.put(recordFieldName, converted);
+ }
+ return avroRecord;
+ }
+
+ return value.toString();
+
+ } else if (value instanceof List) {
+ return value;
+ } else if (value instanceof Object[]) {
+ final List<Object> list = new ArrayList<>();
+ for (final Object o : ((Object[]) value)) {
+ final Object converted = convert(o, schema.getElementType(), fieldName);
+ list.add(converted);
+ }
+ return list;
+ } else if (value instanceof Number) {
+ return value;
+ } else if (value instanceof java.util.Date) {
+ final java.util.Date date = (java.util.Date) value;
+ return dateFormat.format(date);
+ } else if (value instanceof java.sql.Date) {
+ final java.sql.Date sqlDate = (java.sql.Date) value;
+ final java.util.Date date = new java.util.Date(sqlDate.getTime());
+ return dateFormat.format(date);
+ } else if (value instanceof Time) {
+ final Time time = (Time) value;
+ final java.util.Date date = new java.util.Date(time.getTime());
+ return timeFormat.format(date);
+ } else if (value instanceof Timestamp) {
+ final Timestamp time = (Timestamp) value;
+ final java.util.Date date = new java.util.Date(time.getTime());
+ return timestampFormat.format(date);
+ }
+
+ // The different types that we support are numbers (int, long, double, float),
+ // as well as boolean values and Strings. Since Avro doesn't provide
+ // timestamp types, we want to convert those to Strings. So we will cast anything other
+ // than numbers or booleans to strings by using the toString() method.
+ return value.toString();
+ }
+
+
+ @Override
+ public String getMimeType() {
+ return "application/avro-binary";
+ }
+
+
+ public static String normalizeNameForAvro(String inputName) {
+ String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_");
+ if (Character.isDigit(normalizedName.charAt(0))) {
+ normalizedName = "_" + normalizedName;
+ }
+ return normalizedName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
new file mode 100644
index 0000000..eccad7d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.UserTypeOverrideRowReader;
+
+@Tags({"csv", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"})
+@CapabilityDescription("Parses CSV-formatted data, returning each row in the CSV file as a separate record. "
+ + "This reader assumes that the first line in the content is the column names and all subsequent lines are "
+ + "the values. By default, the reader will assume that all columns are of 'String' type, but this can be "
+ + "overridden by adding a user-defined Property where the key is the name of a column and the value is the "
+ + "type of the column. For example, if a Property has the name \"balance\" with a value of float, it the "
+ + "reader will attempt to coerce all values in the \"balance\" column into a floating-point number. See "
+ + "Controller Service's Usage for further documentation.")
+@DynamicProperty(name = "<name of column in CSV>", value = "<type of column values in CSV>",
+ description = "User-defined properties are used to indicate that the values of a specific column should be interpreted as a "
+ + "user-defined data type (e.g., int, double, float, date, etc.)", supportsExpressionLanguage = false)
+public class CSVReader extends UserTypeOverrideRowReader implements RowRecordReaderFactory {
+
+ @Override
+ public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException {
+ return new CSVRecordReader(in, logger, getFieldTypeOverrides());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
new file mode 100644
index 0000000..c2e8963
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import au.com.bytecode.opencsv.CSVReader;
+
+public class CSVRecordReader implements RecordReader {
+ private final ComponentLog logger;
+ private final CSVReader reader;
+ private final String[] firstLine;
+ private final Map<String, DataType> fieldTypeOverrides;
+ private RecordSchema schema;
+
+ public CSVRecordReader(final InputStream in, final ComponentLog logger, final Map<String, DataType> fieldTypeOverrides) throws IOException {
+ this.logger = logger;
+ reader = new CSVReader(new InputStreamReader(new BufferedInputStream(in)));
+ firstLine = reader.readNext();
+ this.fieldTypeOverrides = fieldTypeOverrides;
+ }
+
+ @Override
+ public Record nextRecord() throws IOException, MalformedRecordException {
+ final RecordSchema schema = getSchema();
+
+ while (true) {
+ final String[] line = reader.readNext();
+ if (line == null) {
+ return null;
+ }
+
+ final List<DataType> fieldTypes = schema.getDataTypes();
+ if (fieldTypes.size() != line.length) {
+ logger.warn("Found record with incorrect number of fields. Expected {} but found {}; skipping record", new Object[] {fieldTypes.size(), line.length});
+ continue;
+ }
+
+ try {
+ final Map<String, Object> rowValues = new HashMap<>(schema.getFieldCount());
+
+ int i = 0;
+ for (final String fieldName : schema.getFieldNames()) {
+ if (i >= line.length) {
+ rowValues.put(fieldName, null);
+ continue;
+ }
+
+ final String rawValue = line[i++].trim();
+ final Object converted = convert(schema.getDataType(fieldName).orElse(null), rawValue);
+ rowValues.put(fieldName, converted);
+ }
+
+ return new MapRecord(schema, rowValues);
+ } catch (final Exception e) {
+ throw new MalformedRecordException("Found invalid CSV record", e);
+ }
+ }
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ if (schema != null) {
+ return schema;
+ }
+
+ final List<RecordField> recordFields = new ArrayList<>();
+ for (final String element : firstLine) {
+
+ final String name = element.trim();
+ final DataType dataType;
+
+ final DataType overriddenDataType = fieldTypeOverrides.get(name);
+ if (overriddenDataType != null) {
+ dataType = overriddenDataType;
+ } else {
+ dataType = RecordFieldType.STRING.getDataType();
+ }
+
+ final RecordField field = new RecordField(name, dataType);
+ recordFields.add(field);
+ }
+
+ if (recordFields.isEmpty()) {
+ recordFields.add(new RecordField("line", RecordFieldType.STRING.getDataType()));
+ }
+
+ schema = new SimpleRecordSchema(recordFields);
+ return schema;
+ }
+
+ protected Object convert(final DataType dataType, final String value) {
+ if (dataType == null) {
+ return value;
+ }
+
+ switch (dataType.getFieldType()) {
+ case BOOLEAN:
+ if (value.length() == 0) {
+ return null;
+ }
+ return Boolean.parseBoolean(value);
+ case BYTE:
+ if (value.length() == 0) {
+ return null;
+ }
+ return Byte.parseByte(value);
+ case SHORT:
+ if (value.length() == 0) {
+ return null;
+ }
+ return Short.parseShort(value);
+ case INT:
+ if (value.length() == 0) {
+ return null;
+ }
+ return Integer.parseInt(value);
+ case LONG:
+ case BIGINT:
+ if (value.length() == 0) {
+ return null;
+ }
+ return Long.parseLong(value);
+ case FLOAT:
+ if (value.length() == 0) {
+ return null;
+ }
+ return Float.parseFloat(value);
+ case DOUBLE:
+ if (value.length() == 0) {
+ return null;
+ }
+ return Double.parseDouble(value);
+ case DATE:
+ if (value.length() == 0) {
+ return null;
+ }
+ try {
+ final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value);
+ return new java.sql.Date(date.getTime());
+ } catch (final ParseException e) {
+ logger.warn("Found invalid value for DATE field: " + value + " does not match expected format of "
+ + dataType.getFormat() + "; will substitute a NULL value for this field");
+ return null;
+ }
+ case TIME:
+ if (value.length() == 0) {
+ return null;
+ }
+ try {
+ final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value);
+ return new java.sql.Time(date.getTime());
+ } catch (final ParseException e) {
+ logger.warn("Found invalid value for TIME field: " + value + " does not match expected format of "
+ + dataType.getFormat() + "; will substitute a NULL value for this field");
+ return null;
+ }
+ case TIMESTAMP:
+ if (value.length() == 0) {
+ return null;
+ }
+ try {
+ final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value);
+ return new java.sql.Timestamp(date.getTime());
+ } catch (final ParseException e) {
+ logger.warn("Found invalid value for TIMESTAMP field: " + value + " does not match expected format of "
+ + dataType.getFormat() + "; will substitute a NULL value for this field");
+ return null;
+ }
+ case STRING:
+ default:
+ return value;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
new file mode 100644
index 0000000..906e9c4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+@Tags({"csv", "result", "set", "writer", "serializer", "record", "row"})
+@CapabilityDescription("Writes the contents of a Database ResultSet as CSV data. The first line written "
+ + "will be the column names. All subsequent lines will be the values corresponding to those columns.")
+public class CSVRecordSetWriter extends AbstractRecordSetWriter implements RecordSetWriterFactory {
+
+ @Override
+ public RecordSetWriter createWriter(final ComponentLog logger) {
+ return new WriteCSVResult(getDateFormat(), getTimeFormat(), getTimestampFormat());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
new file mode 100644
index 0000000..79c602d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Collections;
+import java.util.Optional;
+
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+
+import au.com.bytecode.opencsv.CSVWriter;
+
+public class WriteCSVResult implements RecordSetWriter {
+ private final String dateFormat;
+ private final String timeFormat;
+ private final String timestampFormat;
+
+ public WriteCSVResult(final String dateFormat, final String timeFormat, final String timestampFormat) {
+ this.dateFormat = dateFormat;
+ this.timeFormat = timeFormat;
+ this.timestampFormat = timestampFormat;
+ }
+
+ private String getFormat(final Record record, final String fieldName) {
+ final Optional<DataType> dataTypeOption = record.getSchema().getDataType(fieldName);
+ if (!dataTypeOption.isPresent()) {
+ return null;
+ }
+
+ final DataType dataType = dataTypeOption.get();
+ switch (dataType.getFieldType()) {
+ case DATE:
+ return dateFormat == null ? dataType.getFormat() : dateFormat;
+ case TIME:
+ return timeFormat == null ? dataType.getFormat() : timeFormat;
+ case TIMESTAMP:
+ return timestampFormat == null ? dataType.getFormat() : timestampFormat;
+ }
+
+ return dataType.getFormat();
+ }
+
+ @Override
+ public WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException {
+ int count = 0;
+ try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut);
+ final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable);
+ final CSVWriter writer = new CSVWriter(streamWriter)) {
+
+ try {
+ final RecordSchema schema = rs.getSchema();
+ final String[] columnNames = schema.getFieldNames().toArray(new String[0]);
+ writer.writeNext(columnNames);
+
+ Record record;
+ while ((record = rs.next()) != null) {
+ final String[] colVals = new String[schema.getFieldCount()];
+ int i = 0;
+ for (final String fieldName : schema.getFieldNames()) {
+ colVals[i++] = record.getAsString(fieldName, getFormat(record, fieldName));
+ }
+
+ writer.writeNext(colVals);
+ count++;
+ }
+ } catch (final Exception e) {
+ throw new IOException("Failed to serialize results", e);
+ }
+ }
+
+ return WriteResult.of(count, Collections.emptyMap());
+ }
+
+ @Override
+ public WriteResult write(final Record record, final OutputStream rawOut) throws IOException {
+ try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut);
+ final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable);
+ final CSVWriter writer = new CSVWriter(streamWriter)) {
+
+ try {
+ final RecordSchema schema = record.getSchema();
+ final String[] columnNames = schema.getFieldNames().toArray(new String[0]);
+ writer.writeNext(columnNames);
+
+ final String[] colVals = new String[schema.getFieldCount()];
+ int i = 0;
+ for (final String fieldName : schema.getFieldNames()) {
+ colVals[i++] = record.getAsString(fieldName, getFormat(record, fieldName));
+ }
+
+ writer.writeNext(colVals);
+ } catch (final Exception e) {
+ throw new IOException("Failed to serialize results", e);
+ }
+ }
+
+ return WriteResult.of(1, Collections.emptyMap());
+ }
+
+ @Override
+ public String getMimeType() {
+ return "text/csv";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
new file mode 100644
index 0000000..dd9c4e0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+import io.thekraken.grok.api.Grok;
+
+public class GrokExpressionValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ try {
+ new Grok().compile(input);
+ } catch (final Exception e) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(false)
+ .explanation("Invalid Grok pattern: " + e.getMessage())
+ .build();
+ }
+
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(true)
+ .build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
new file mode 100644
index 0000000..f72d5d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.UserTypeOverrideRowReader;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.exception.GrokException;
+
+@Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", "record", "reader", "regex", "pattern", "logstash"})
+@CapabilityDescription("Provides a mechanism for reading unstructured text data, such as log files, and structuring the data "
+ + "so that it can be processed. The service is configured using Grok patterns. "
+ + "The service reads from a stream of data and splits each message that it finds into a separate Record, each containing the fields that are configured. "
+ + "If a line in the input does not match the expected message pattern, the line of text is considered to be part of the previous "
+ + "message, with the exception of stack traces. A stack trace that is found at the end of a log message is considered to be part "
+ + "of the previous message but is added to the 'STACK_TRACE' field of the Record. If a record has no stack trace, it will have a NULL value "
+ + "for the STACK_TRACE field.")
+public class GrokReader extends UserTypeOverrideRowReader implements RowRecordReaderFactory {
+ private volatile Grok grok;
+
+ private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt";
+
+ static final PropertyDescriptor PATTERN_FILE = new PropertyDescriptor.Builder()
+ .name("Grok Pattern File")
+ .description("Path to a file that contains Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file "
+ + "will be used. If specified, all patterns in the given pattern file will override the default patterns. See the Controller Service's "
+ + "Additional Details for a list of pre-defined patterns.")
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(false)
+ .build();
+ static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder()
+ .name("Grok Expression")
+ .description("Specifies the format of a log line in Grok format. This allows the Record Reader to understand how to parse each log line. "
+ + "If a line in the log file does not match this pattern, the line will be assumed to belong to the previous log message.")
+ .addValidator(new GrokExpressionValidator())
+ .required(true)
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(PATTERN_FILE);
+ properties.add(GROK_EXPRESSION);
+ return properties;
+ }
+
+ @OnEnabled
+ public void preCompile(final ConfigurationContext context) throws GrokException, IOException {
+ grok = new Grok();
+
+ try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
+ final Reader reader = new InputStreamReader(in)) {
+ grok.addPatternFromReader(reader);
+ }
+
+ if (context.getProperty(PATTERN_FILE).isSet()) {
+ grok.addPatternFromFile(context.getProperty(PATTERN_FILE).getValue());
+ }
+
+ grok.compile(context.getProperty(GROK_EXPRESSION).getValue());
+ }
+
+ @Override
+ public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException {
+ return new GrokRecordReader(in, grok, getFieldTypeOverrides());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
new file mode 100644
index 0000000..bdf12f9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.GrokUtils;
+import io.thekraken.grok.api.Match;
+
+public class GrokRecordReader implements RecordReader {
+ private final BufferedReader reader;
+ private final Grok grok;
+ private final Map<String, DataType> fieldTypeOverrides;
+
+ private String nextLine;
+ private RecordSchema schema;
+
+ static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE";
+ private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
+ "^\\s*(?:(?: |\\t)+at )|"
+ + "(?:(?: |\\t)+\\[CIRCULAR REFERENCE\\:)|"
+ + "(?:Caused by\\: )|"
+ + "(?:Suppressed\\: )|"
+ + "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
+
+ private static final FastDateFormat TIME_FORMAT_DATE;
+ private static final FastDateFormat TIME_FORMAT_TIME;
+ private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
+
+ static {
+ final TimeZone gmt = TimeZone.getTimeZone("GMT");
+ TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt);
+ TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
+ TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);
+ }
+
+ public GrokRecordReader(final InputStream in, final Grok grok, final Map<String, DataType> fieldTypeOverrides) {
+ this.reader = new BufferedReader(new InputStreamReader(in));
+ this.grok = grok;
+ this.fieldTypeOverrides = fieldTypeOverrides;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public Record nextRecord() throws IOException, MalformedRecordException {
+ final String line = nextLine == null ? reader.readLine() : nextLine;
+ nextLine = null; // ensure that we don't process nextLine again
+ if (line == null) {
+ return null;
+ }
+
+ final RecordSchema schema = getSchema();
+
+ final Match match = grok.match(line);
+ match.captures();
+ final Map<String, Object> valueMap = match.toMap();
+ if (valueMap.isEmpty()) { // We were unable to match the pattern so return an empty Object array.
+ return new MapRecord(schema, Collections.emptyMap());
+ }
+
+ // Read the next line to see if it matches the pattern (in which case we will simply leave it for
+ // the next call to nextRecord()) or we will attach it to the previously read record.
+ String stackTrace = null;
+ final StringBuilder toAppend = new StringBuilder();
+ while ((nextLine = reader.readLine()) != null) {
+ final Match nextLineMatch = grok.match(nextLine);
+ nextLineMatch.captures();
+ final Map<String, Object> nextValueMap = nextLineMatch.toMap();
+ if (nextValueMap.isEmpty()) {
+ // next line did not match. Check if it indicates a Stack Trace. If so, read until
+ // the stack trace ends. Otherwise, append the next line to the last field in the record.
+ if (isStartOfStackTrace(nextLine)) {
+ stackTrace = readStackTrace(nextLine);
+ break;
+ } else {
+ toAppend.append("\n").append(nextLine);
+ }
+ } else {
+ // The next line matched our pattern.
+ break;
+ }
+ }
+
+ try {
+ final List<DataType> fieldTypes = schema.getDataTypes();
+ final Map<String, Object> values = new HashMap<>(fieldTypes.size());
+
+ for (final String fieldName : schema.getFieldNames()) {
+ final Object value = valueMap.get(fieldName);
+ if (value == null) {
+ values.put(fieldName, null);
+ continue;
+ }
+
+ final DataType fieldType = schema.getDataType(fieldName).orElse(null);
+ final Object converted = convert(fieldType, value.toString());
+ values.put(fieldName, converted);
+ }
+
+ final String lastFieldBeforeStackTrace = schema.getFieldNames().get(schema.getFieldCount() - 2);
+ if (toAppend.length() > 0) {
+ final Object existingValue = values.get(lastFieldBeforeStackTrace);
+ final String updatedValue = existingValue == null ? toAppend.toString() : existingValue + toAppend.toString();
+ values.put(lastFieldBeforeStackTrace, updatedValue);
+ }
+
+ values.put(STACK_TRACE_COLUMN_NAME, stackTrace);
+
+ return new MapRecord(schema, values);
+ } catch (final Exception e) {
+ throw new MalformedRecordException("Found invalid log record and will skip it. Record: " + line, e);
+ }
+ }
+
+
+ private boolean isStartOfStackTrace(final String line) {
+ if (line == null) {
+ return false;
+ }
+
+ // Stack Traces are generally of the form:
+ // java.lang.IllegalArgumentException: My message
+ // at org.apache.nifi.MyClass.myMethod(MyClass.java:48)
+ // at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
+ // Caused by: java.net.SocketTimeoutException: null
+ // ... 13 common frames omitted
+
+ int index = line.indexOf("Exception: ");
+ if (index < 0) {
+ index = line.indexOf("Error: ");
+ }
+
+ if (index < 0) {
+ return false;
+ }
+
+ if (line.indexOf(" ") < index) {
+ return false;
+ }
+
+ return true;
+ }
+
+ private String readStackTrace(final String firstLine) throws IOException {
+ final StringBuilder sb = new StringBuilder(firstLine);
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (isLineInStackTrace(line)) {
+ sb.append("\n").append(line);
+ } else {
+ nextLine = line;
+ break;
+ }
+ }
+
+ return sb.toString();
+ }
+
+ private boolean isLineInStackTrace(final String line) {
+ return STACK_TRACE_PATTERN.matcher(line).find();
+ }
+
+
+ protected Object convert(final DataType fieldType, final String string) {
+ if (fieldType == null) {
+ return string;
+ }
+ switch (fieldType.getFieldType()) {
+ case BOOLEAN:
+ if (string.length() == 0) {
+ return null;
+ }
+ return Boolean.parseBoolean(string);
+ case BYTE:
+ if (string.length() == 0) {
+ return null;
+ }
+ return Byte.parseByte(string);
+ case SHORT:
+ if (string.length() == 0) {
+ return null;
+ }
+ return Short.parseShort(string);
+ case INT:
+ if (string.length() == 0) {
+ return null;
+ }
+ return Integer.parseInt(string);
+ case LONG:
+ if (string.length() == 0) {
+ return null;
+ }
+ return Long.parseLong(string);
+ case FLOAT:
+ if (string.length() == 0) {
+ return null;
+ }
+ return Float.parseFloat(string);
+ case DOUBLE:
+ if (string.length() == 0) {
+ return null;
+ }
+ return Double.parseDouble(string);
+ case DATE:
+ if (string.length() == 0) {
+ return null;
+ }
+ try {
+ Date date = TIME_FORMAT_DATE.parse(string);
+ return new java.sql.Date(date.getTime());
+ } catch (ParseException e) {
+ return null;
+ }
+ case TIME:
+ if (string.length() == 0) {
+ return null;
+ }
+ try {
+ Date date = TIME_FORMAT_TIME.parse(string);
+ return new java.sql.Time(date.getTime());
+ } catch (ParseException e) {
+ return null;
+ }
+ case TIMESTAMP:
+ if (string.length() == 0) {
+ return null;
+ }
+ try {
+ Date date = TIME_FORMAT_TIMESTAMP.parse(string);
+ return new java.sql.Timestamp(date.getTime());
+ } catch (ParseException e) {
+ return null;
+ }
+ case STRING:
+ default:
+ return string;
+ }
+ }
+
+
+ @Override
+ public RecordSchema getSchema() {
+ if (schema != null) {
+ return schema;
+ }
+
+ final List<RecordField> fields = new ArrayList<>();
+
+ String grokExpression = grok.getOriginalGrokPattern();
+ while (grokExpression.length() > 0) {
+ final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression);
+ if (matcher.find()) {
+ final Map<String, String> namedGroups = GrokUtils.namedGroups(matcher, grokExpression);
+ final String fieldName = namedGroups.get("subname");
+
+ DataType dataType = fieldTypeOverrides.get(fieldName);
+ if (dataType == null) {
+ dataType = RecordFieldType.STRING.getDataType();
+ }
+
+ final RecordField recordField = new RecordField(fieldName, dataType);
+ fields.add(recordField);
+
+ if (grokExpression.length() > matcher.end() + 1) {
+ grokExpression = grokExpression.substring(matcher.end() + 1);
+ } else {
+ break;
+ }
+ }
+ }
+
+ fields.add(new RecordField(STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType()));
+
+ schema = new SimpleRecordSchema(fields);
+ return schema;
+ }
+
+}