You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/06/09 20:13:51 UTC

[2/2] nifi git commit: NIFI-3921: Allow Record Writers to inherit schema from Record

NIFI-3921: Allow Record Writers to inherit schema from Record

Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #1902


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e7dcb6f6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e7dcb6f6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e7dcb6f6

Branch: refs/heads/master
Commit: e7dcb6f6c5665a2a2b8b96b39b76417f92e4f190
Parents: 4ed7511
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jun 7 13:42:13 2017 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Fri Jun 9 16:13:25 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/avro/AvroTypeUtil.java | 161 ++++++++++++++++---
 .../schema/access/AvroSchemaTextStrategy.java   |   2 +-
 .../nifi/schema/access/SchemaAccessUtils.java   |  10 ++
 .../WriteAvroSchemaAttributeStrategy.java       |  52 ++++++
 .../org/apache/nifi/avro/TestAvroTypeUtil.java  | 103 ++++++++++++
 .../hadoop/AbstractFetchHDFSRecord.java         |  60 ++++---
 .../hadoop/AbstractPutHDFSRecord.java           |  95 ++---------
 .../serialization/record/MockRecordWriter.java  |   3 +-
 ...onworksAttributeSchemaReferenceStrategy.java |   2 +-
 ...rtonworksEncodedSchemaReferenceStrategy.java |   2 +-
 .../schema/access/InheritSchemaFromRecord.java  |  44 +++++
 .../schema/access/NopSchemaAccessWriter.java    |  49 ++++++
 .../schema/access/SchemaAccessStrategy.java     |   3 +-
 .../access/SchemaNamePropertyStrategy.java      |   2 +-
 .../schema/access/SchemaTextAsAttribute.java    |  60 -------
 .../processors/kafka/pubsub/ConsumerLease.java  |   2 +-
 .../kafka/pubsub/PublishKafkaRecord_0_10.java   |  21 +--
 .../processors/kafka/pubsub/PublisherLease.java |   4 +-
 .../pubsub/TestPublishKafkaRecord_0_10.java     |  18 +--
 .../kafka/pubsub/util/MockRecordWriter.java     |   3 +-
 .../nifi/processors/parquet/PutParquet.java     |   2 +-
 .../processors/parquet/FetchParquetTest.java    |   3 +-
 .../nifi/processors/parquet/PutParquetTest.java |  68 +++-----
 .../record/script/ScriptedRecordSetWriter.java  |  22 +--
 .../script/ScriptedRecordSetWriterTest.groovy   |   4 +-
 .../groovy/test_record_writer_inline.groovy     |   2 +-
 .../standard/AbstractRecordProcessor.java       |  41 ++---
 .../standard/AbstractRouteRecord.java           |  13 +-
 .../processors/standard/PartitionRecord.java    |  12 +-
 .../nifi/processors/standard/QueryRecord.java   |  25 +--
 .../nifi/processors/standard/SplitRecord.java   |  12 +-
 .../processors/standard/TestQueryRecord.java    |  19 ++-
 .../serialization/RecordSetWriterFactory.java   |   5 +-
 .../java/org/apache/nifi/avro/AvroReader.java   |   2 +-
 .../apache/nifi/avro/AvroRecordSetWriter.java   |   2 +-
 .../avro/EmbeddedAvroSchemaAccessStrategy.java  |   2 +-
 .../nifi/csv/CSVHeaderSchemaStrategy.java       |   2 +-
 .../java/org/apache/nifi/csv/CSVReader.java     |   2 +-
 .../java/org/apache/nifi/grok/GrokReader.java   |   4 +-
 .../org/apache/nifi/json/JsonPathReader.java    |   2 +-
 .../org/apache/nifi/json/JsonTreeReader.java    |   2 +-
 .../SchemaRegistryRecordSetWriter.java          |  31 +++-
 .../serialization/SchemaRegistryService.java    |  19 ++-
 .../nifi/text/FreeFormTextRecordSetWriter.java  |   9 +-
 .../avro/TestWriteAvroResultWithoutSchema.java  |   4 +-
 .../nifi/csv/TestCSVHeaderSchemaStrategy.java   |   2 +-
 46 files changed, 605 insertions(+), 402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 76438c5..87139c6 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -17,7 +17,25 @@
 
 package org.apache.nifi.avro;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.apache.avro.Conversions;
+import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -29,8 +47,6 @@ import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
-import org.apache.avro.JsonProperties;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
@@ -39,28 +55,15 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.MathContext;
-import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 public class AvroTypeUtil {
     private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class);
     public static final String AVRO_SCHEMA_FORMAT = "avro";
@@ -72,30 +75,142 @@ public class AvroTypeUtil {
     private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
     private static final String LOGICAL_TYPE_DECIMAL = "decimal";
 
-    public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException {
+    public static Schema extractAvroSchema(final RecordSchema recordSchema) {
         if (recordSchema == null) {
             throw new IllegalArgumentException("RecordSchema cannot be null");
         }
 
         final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat();
         if (!schemaFormatOption.isPresent()) {
-            throw new SchemaNotFoundException("No Schema Format was present in the RecordSchema");
+            return buildAvroSchema(recordSchema);
         }
 
         final String schemaFormat = schemaFormatOption.get();
         if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) {
-            throw new SchemaNotFoundException("Schema provided is not in Avro format");
+            return buildAvroSchema(recordSchema);
         }
 
         final Optional<String> textOption = recordSchema.getSchemaText();
         if (!textOption.isPresent()) {
-            throw new SchemaNotFoundException("No Schema text was present in the RecordSchema");
+            return buildAvroSchema(recordSchema);
         }
 
         final String text = textOption.get();
         return new Schema.Parser().parse(text);
     }
 
+    private static Schema buildAvroSchema(final RecordSchema recordSchema) {
+        final List<Field> avroFields = new ArrayList<>(recordSchema.getFieldCount());
+        for (final RecordField recordField : recordSchema.getFields()) {
+            avroFields.add(buildAvroField(recordField));
+        }
+
+        final Schema avroSchema = Schema.createRecord("nifiRecord", null, "org.apache.nifi", false, avroFields);
+        return avroSchema;
+    }
+
+    private static Field buildAvroField(final RecordField recordField) {
+        final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName());
+        final Field field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue());
+        for (final String alias : recordField.getAliases()) {
+            field.addAlias(alias);
+        }
+
+        return field;
+    }
+
+    private static Schema buildAvroSchema(final DataType dataType, final String fieldName) {
+        final Schema schema;
+
+        switch (dataType.getFieldType()) {
+            case ARRAY:
+                final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+                final DataType elementDataType = arrayDataType.getElementType();
+                if (RecordFieldType.BYTE.equals(elementDataType.getFieldType())) {
+                    schema = Schema.create(Type.BYTES);
+                } else {
+                    final Schema elementType = buildAvroSchema(elementDataType, fieldName);
+                    schema = Schema.createArray(elementType);
+                }
+                break;
+            case BIGINT:
+                schema = Schema.create(Type.STRING);
+                break;
+            case BOOLEAN:
+                schema = Schema.create(Type.BOOLEAN);
+                break;
+            case BYTE:
+                schema = Schema.create(Type.INT);
+                break;
+            case CHAR:
+                schema = Schema.create(Type.STRING);
+                break;
+            case CHOICE:
+                final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+                final List<DataType> options = choiceDataType.getPossibleSubTypes();
+
+                final List<Schema> unionTypes = new ArrayList<>(options.size());
+                for (final DataType option : options) {
+                    unionTypes.add(buildAvroSchema(option, fieldName));
+                }
+
+                schema = Schema.createUnion(unionTypes);
+                break;
+            case DATE:
+                schema = Schema.create(Type.INT);
+                LogicalTypes.date().addToSchema(schema);
+                break;
+            case DOUBLE:
+                schema = Schema.create(Type.DOUBLE);
+                break;
+            case FLOAT:
+                schema = Schema.create(Type.FLOAT);
+                break;
+            case INT:
+                schema = Schema.create(Type.INT);
+                break;
+            case LONG:
+                schema = Schema.create(Type.LONG);
+                break;
+            case MAP:
+                schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName));
+                break;
+            case RECORD:
+                final RecordDataType recordDataType = (RecordDataType) dataType;
+                final RecordSchema childSchema = recordDataType.getChildSchema();
+
+                final List<Field> childFields = new ArrayList<>(childSchema.getFieldCount());
+                for (final RecordField field : childSchema.getFields()) {
+                    childFields.add(buildAvroField(field));
+                }
+
+                schema = Schema.createRecord(fieldName + "Type", null, "org.apache.nifi", false, childFields);
+                break;
+            case SHORT:
+                schema = Schema.create(Type.INT);
+                break;
+            case STRING:
+                schema = Schema.create(Type.STRING);
+                break;
+            case TIME:
+                schema = Schema.create(Type.INT);
+                LogicalTypes.timeMillis().addToSchema(schema);
+                break;
+            case TIMESTAMP:
+                schema = Schema.create(Type.LONG);
+                LogicalTypes.timestampMillis().addToSchema(schema);
+                break;
+            default:
+                return null;
+        }
+
+        return nullable(schema);
+    }
+
+    private static Schema nullable(final Schema schema) {
+        return Schema.createUnion(Schema.create(Type.NULL), schema);
+    }
+
     /**
      * Returns a DataType for the given Avro Schema
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
index 3155909..5bf084e 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
@@ -40,7 +40,7 @@ public class AvroSchemaTextStrategy implements SchemaAccessStrategy {
     }
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
         final String schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue();
         if (schemaText == null || schemaText.trim().isEmpty()) {
             throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Text");

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
index cab9c02..b335b11 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
@@ -43,6 +43,10 @@ public class SchemaAccessUtils {
                     + "found at https://github.com/hortonworks/registry");
     public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
             "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
+    public static final AllowableValue INHERIT_RECORD_SCHEMA = new AllowableValue("inherit-record-schema", "Inherit Record Schema",
+        "The schema used to write records will be the same schema that was given to the Record when the Record was created.");
+
+
 
     public  static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
             .name("schema-registry")
@@ -117,6 +121,8 @@ public class SchemaAccessUtils {
     public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ProcessContext context) {
         if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
             return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
+        } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
+            return new InheritSchemaFromRecord();
         } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
             return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
         } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
@@ -131,6 +137,8 @@ public class SchemaAccessUtils {
     public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
         if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
             return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
+        } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
+            return new InheritSchemaFromRecord();
         } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
             return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
         } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
@@ -145,6 +153,8 @@ public class SchemaAccessUtils {
     public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
         if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
             return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
+        } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
+            return new InheritSchemaFromRecord();
         } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
             return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
         } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
new file mode 100644
index 0000000..d9be673
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
+
+    @Override
+    public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
+    }
+
+    @Override
+    public Map<String, String> getAttributes(final RecordSchema schema) {
+        final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
+        final String schemaText = avroSchema.toString();
+        return Collections.singletonMap("avro.schema", schemaText);
+    }
+
+    @Override
+    public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
+    }
+
+    @Override
+    public Set<SchemaField> getRequiredSchemaFields() {
+        return EnumSet.noneOf(SchemaField.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
new file mode 100644
index 0000000..fe19733
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+
+public class TestAvroTypeUtil {
+
+    @Test
+    public void testCreateAvroSchemaPrimitiveTypes() throws SchemaNotFoundException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("int", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("long", RecordFieldType.LONG.getDataType()));
+        fields.add(new RecordField("string", RecordFieldType.STRING.getDataType(), "hola", Collections.singleton("greeting")));
+        fields.add(new RecordField("byte", RecordFieldType.BYTE.getDataType()));
+        fields.add(new RecordField("char", RecordFieldType.CHAR.getDataType()));
+        fields.add(new RecordField("short", RecordFieldType.SHORT.getDataType()));
+        fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
+        fields.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
+        fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+        fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+
+        final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+        fields.add(new RecordField("strings", arrayType));
+
+        final DataType mapType = RecordFieldType.MAP.getMapDataType(RecordFieldType.LONG.getDataType());
+        fields.add(new RecordField("map", mapType));
+
+
+        final List<RecordField> personFields = new ArrayList<>();
+        personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        personFields.add(new RecordField("dob", RecordFieldType.DATE.getDataType()));
+        final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+        final DataType personType = RecordFieldType.RECORD.getRecordDataType(personSchema);
+        fields.add(new RecordField("person", personType));
+
+
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
+
+        // everything should be a union, since it's nullable.
+        for (final Field field : avroSchema.getFields()) {
+            final Schema fieldSchema = field.schema();
+            assertEquals(Type.UNION, fieldSchema.getType());
+            assertTrue("Field " + field.name() + " does not contain NULL type", fieldSchema.getTypes().contains(Schema.create(Type.NULL)));
+        }
+
+        final RecordSchema afterConversion = AvroTypeUtil.createSchema(avroSchema);
+
+        assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("int").get());
+        assertEquals(RecordFieldType.LONG.getDataType(), afterConversion.getDataType("long").get());
+        assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("string").get());
+        assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("byte").get());
+        assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("char").get());
+        assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("short").get());
+        assertEquals(RecordFieldType.DOUBLE.getDataType(), afterConversion.getDataType("double").get());
+        assertEquals(RecordFieldType.FLOAT.getDataType(), afterConversion.getDataType("float").get());
+        assertEquals(RecordFieldType.TIME.getDataType(), afterConversion.getDataType("time").get());
+        assertEquals(RecordFieldType.DATE.getDataType(), afterConversion.getDataType("date").get());
+        assertEquals(RecordFieldType.TIMESTAMP.getDataType(), afterConversion.getDataType("timestamp").get());
+        assertEquals(arrayType, afterConversion.getDataType("strings").get());
+        assertEquals(mapType, afterConversion.getDataType("map").get());
+        assertEquals(personType, afterConversion.getDataType("person").get());
+
+        final RecordField stringField = afterConversion.getField("string").get();
+        assertEquals("hola", stringField.getDefaultValue());
+        assertEquals(Collections.singleton("greeting"), stringField.getAliases());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index 7cc6bb5..fbbbbf4 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -16,7 +16,22 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import org.apache.commons.io.input.NullInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,29 +52,11 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.BufferedOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * Base processor for reading a data from HDFS that can be fetched into records.
  */
@@ -187,7 +184,6 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
                 final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
 
                 final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-                final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, new NullInputStream(0));
 
                 final StopWatch stopWatch = new StopWatch(true);
 
@@ -200,22 +196,20 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
                     try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
                          final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
 
-                        final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList());
+                        Record record = recordReader.nextRecord();
+                        final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, record == null ? null : record.getSchema());
 
-                        final RecordSet recordSet = new RecordSet() {
-                            @Override
-                            public RecordSchema getSchema() throws IOException {
-                                return emptySchema;
+                        try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) {
+                            recordSetWriter.beginRecordSet();
+                            if (record != null) {
+                                recordSetWriter.write(record);
                             }
 
-                            @Override
-                            public Record next() throws IOException {
-                                return recordReader.nextRecord();
+                            while ((record = recordReader.nextRecord()) != null) {
+                                recordSetWriter.write(record);
                             }
-                        };
 
-                        try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) {
-                            writeResult.set(recordSetWriter.write(recordSet));
+                            writeResult.set(recordSetWriter.finishRecordSet());
                             mimeTypeRef.set(recordSetWriter.getMimeType());
                         }
                     } catch (Exception e) {
@@ -247,7 +241,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
 
             } catch (final FileNotFoundException | AccessControlException e) {
                 getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, e});
-                final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage());
+                final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage() == null ? e.toString() : e.getMessage());
                 session.transfer(failureFlowFile, REL_FAILURE);
             } catch (final IOException | FlowFileAccessException e) {
                 getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to retry", new Object[] {filenameValue, originalFlowFile, e});
@@ -255,7 +249,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
                 context.yield();
             } catch (final Throwable t) {
                 getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, t});
-                final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage());
+                final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage() == null ? t.toString() : t.getMessage());
                 session.transfer(failureFlowFile, REL_FAILURE);
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index 6676ee6..70a3697 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -16,6 +16,21 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -28,8 +43,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
@@ -42,10 +55,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.hadoop.exception.FailureException;
 import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
 import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
-import org.apache.nifi.schema.access.SchemaAccessStrategy;
-import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.WriteResult;
@@ -53,32 +63,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.BufferedInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
-
 /**
  * Base class for processors that write Records to HDFS.
  */
@@ -156,18 +140,10 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
 
     private volatile String remoteOwner;
     private volatile String remoteGroup;
-    private volatile SchemaAccessStrategy schemaAccessStrategy;
 
     private volatile Set<Relationship> putHdfsRecordRelationships;
     private volatile List<PropertyDescriptor> putHdfsRecordProperties;
 
-    private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
-            SCHEMA_NAME_PROPERTY,
-            SCHEMA_TEXT_PROPERTY,
-            HWX_SCHEMA_REF_ATTRIBUTES,
-            HWX_CONTENT_ENCODED_SCHEMA
-    ));
-
 
     @Override
     protected final void init(final ProcessorInitializationContext context) {
@@ -187,19 +163,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
                 .description("The parent directory to which files should be written. Will be created if it doesn't exist.")
                 .build());
 
-        final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
-
-        props.add(new PropertyDescriptor.Builder()
-                .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
-                .description("Specifies how to obtain the schema that is to be used for writing the data.")
-                .allowableValues(strategies)
-                .defaultValue(getDefaultSchemaAccessStrategy().getValue())
-                .build());
-
-        props.add(SCHEMA_REGISTRY);
-        props.add(SCHEMA_NAME);
-        props.add(SCHEMA_TEXT);
-
         final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]);
 
         props.add(new PropertyDescriptor.Builder()
@@ -216,18 +179,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
         this.putHdfsRecordProperties = Collections.unmodifiableList(props);
     }
 
-    protected List<AllowableValue> getSchemaAccessStrategyValues() {
-        return strategyList;
-    }
-
-    protected AllowableValue getDefaultSchemaAccessStrategy() {
-        return SCHEMA_NAME_PROPERTY;
-    }
-
-    private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
-        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
-    }
-
     /**
      * @param context the initialization context
      * @return the possible compression types
@@ -259,22 +210,11 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
        return putHdfsRecordProperties;
     }
 
-    @Override
-    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
-        final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
-        return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues());
-    }
 
     @OnScheduled
     public final void onScheduled(final ProcessContext context) throws IOException {
         super.abstractOnScheduled(context);
 
-        final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
-
-        final PropertyDescriptor descriptor = getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
-        final String schemaAccess = context.getProperty(descriptor).getValue();
-        this.schemaAccessStrategy = SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
-
         this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
         this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
 
@@ -365,8 +305,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
                     HDFSRecordWriter recordWriter = null;
 
                     try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
-                        final RecordSchema destRecordSchema = schemaAccessStrategy.getSchema(flowFile, in);
-                        recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema);
 
                         // if we fail to create the RecordReader then we want to route to failure, so we need to
                         // handle this separately from the other IOExceptions which normally route to retry
@@ -379,8 +317,9 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
                         }
 
                         final RecordSet recordSet = recordReader.createRecordSet();
-                        writeResult.set(recordWriter.write(recordSet));
 
+                        recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, recordReader.getSchema());
+                        writeResult.set(recordWriter.write(recordSet));
                     } catch (Exception e) {
                         exceptionHolder.set(e);
                     } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index 891bbe3..9bde647 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.serialization.record;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collections;
 
@@ -55,7 +54,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
     }
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream content) throws SchemaNotFoundException, IOException {
+    public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema schema) throws SchemaNotFoundException, IOException {
         return new SimpleRecordSchema(Collections.emptyList());
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
index d1f0e8a..073a453 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -47,7 +47,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
     }
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
         final String schemaIdentifier = flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE);
         final String schemaVersion = flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE);
         final String schemaProtocol = flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
index de89900..b2e5a48 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -45,7 +45,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
     }
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
         final byte[] buffer = new byte[13];
         try {
             StreamUtils.fillBuffer(contentStream, buffer);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java
new file mode 100644
index 0000000..d1ed63d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class InheritSchemaFromRecord implements SchemaAccessStrategy {
+
+    @Override
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+        if (readSchema == null) {
+            throw new SchemaNotFoundException("Cannot inherit Schema from Record because no schema was found");
+        }
+
+        return readSchema;
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return EnumSet.allOf(SchemaField.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java
new file mode 100644
index 0000000..75dedc5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class NopSchemaAccessWriter implements SchemaAccessWriter {
+
+    @Override
+    public void writeHeader(RecordSchema schema, OutputStream out) throws IOException {
+    }
+
+    @Override
+    public Map<String, String> getAttributes(RecordSchema schema) {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
+    }
+
+    @Override
+    public Set<SchemaField> getRequiredSchemaFields() {
+        return EnumSet.noneOf(SchemaField.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
index 68f9ecf..923eaf0 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
@@ -30,9 +30,10 @@ public interface SchemaAccessStrategy {
      *
      * @param flowFile flowfile
      * @param contentStream content of flowfile
+     * @param readSchema the schema that was read from the input FlowFile, or <code>null</code> if there was none
      * @return the RecordSchema for the FlowFile
      */
-    RecordSchema getSchema(FlowFile flowFile, InputStream contentStream) throws SchemaNotFoundException, IOException;
+    RecordSchema getSchema(FlowFile flowFile, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
 
     /**
      * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream)}.

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
index d59e5da..796e1e4 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
@@ -43,7 +43,7 @@ public class SchemaNamePropertyStrategy implements SchemaAccessStrategy {
     }
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
         final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue();
         if (schemaName.trim().isEmpty()) {
             throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Name.");

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
deleted file mode 100644
index f39bdca..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-import org.apache.nifi.serialization.record.RecordSchema;
-
-public class SchemaTextAsAttribute implements SchemaAccessWriter {
-    private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
-
-    @Override
-    public void writeHeader(final RecordSchema schema, final OutputStream out) {
-    }
-
-    @Override
-    public Map<String, String> getAttributes(final RecordSchema schema) {
-        final Optional<String> textFormatOption = schema.getSchemaFormat();
-        final Optional<String> textOption = schema.getSchemaText();
-        return Collections.singletonMap(textFormatOption.get() + ".schema", textOption.get());
-    }
-
-    @Override
-    public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
-        final Optional<String> textFormatOption = schema.getSchemaFormat();
-        if (!textFormatOption.isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text Format is not present");
-        }
-
-        final Optional<String> textOption = schema.getSchemaText();
-        if (!textOption.isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text is not present");
-        }
-    }
-
-    @Override
-    public Set<SchemaField> getRequiredSchemaFields() {
-        return schemaFields;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index ee6b1ff..242c917 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -465,7 +465,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
                     final RecordSchema writeSchema;
                     try {
-                        writeSchema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
+                        writeSchema = writerFactory.getSchema(flowFile, recordSchema);
                     } catch (final Exception e) {
                         logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
index e48568b..21b2e31 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
@@ -59,6 +59,7 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
 
 @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"})
 @CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.10.x Producer API. "
@@ -309,6 +310,8 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
 
         final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
         final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
 
         final long startTime = System.nanoTime();
         try (final PublisherLease lease = pool.obtainPublisher()) {
@@ -323,24 +326,16 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
                 final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
                 final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
 
-                final RecordSchema schema;
-                final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-                try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
-                    schema = writerFactory.getSchema(flowFile, in);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to determine Schema for writing messages to Kafka for {}; routing to failure", new Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
-                    continue;
-                }
-
                 try {
                     session.read(flowFile, new InputStreamCallback() {
                         @Override
                         public void process(final InputStream rawIn) throws IOException {
                             try (final InputStream in = new BufferedInputStream(rawIn)) {
-                                final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger());
-                                lease.publish(flowFile, reader, writerFactory, schema, messageKeyField, topic);
+                                final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+                                final RecordSet recordSet = reader.createRecordSet();
+
+                                final RecordSchema schema = writerFactory.getSchema(flowFile, recordSet.getSchema());
+                                lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic);
                             } catch (final SchemaNotFoundException | MalformedRecordException e) {
                                 throw new ProcessException(e);
                             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 4238956..2004346 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -33,7 +33,6 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.Record;
@@ -96,7 +95,7 @@ public class PublisherLease implements Closeable {
         }
     }
 
-    void publish(final FlowFile flowFile, final RecordReader reader, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
+    void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
         final String messageKeyField, final String topic) throws IOException {
         if (tracker == null) {
             tracker = new InFlightMessageTracker();
@@ -105,7 +104,6 @@ public class PublisherLease implements Closeable {
         final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
 
         Record record;
-        final RecordSet recordSet = reader.createRecordSet();
         int recordCount = 0;
 
         try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
index 7cff2a7..5c59c66 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
@@ -42,10 +42,10 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -65,7 +65,7 @@ public class TestPublishKafkaRecord_0_10 {
     public void setup() throws InitializationException, IOException {
         mockPool = mock(PublisherPool.class);
         mockLease = mock(PublisherLease.class);
-        Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class),
+        Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
             any(RecordSchema.class), any(String.class), any(String.class));
 
         when(mockPool.obtainPublisher()).thenReturn(mockLease);
@@ -104,7 +104,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
 
-        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -123,7 +123,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3);
 
-        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -138,7 +138,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1);
 
-        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -155,7 +155,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3);
 
-        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
 
-        verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
@@ -207,7 +207,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
 
-        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -241,7 +241,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
         runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2);
 
-        verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
index 1549626..60e494b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.processors.kafka.pubsub.util;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collections;
 
@@ -53,7 +52,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
     }
 
     @Override
-    public RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException {
+    public RecordSchema getSchema(FlowFile flowFile, RecordSchema schema) throws SchemaNotFoundException, IOException {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
index 934eb59..b9d2e42 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
@@ -185,7 +185,7 @@ public class PutParquet extends AbstractPutHDFSRecord {
         return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema);
     }
 
-    private void applyCommonConfig(final ParquetWriter.Builder builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) {
+    private void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) {
         builder.withConf(conf);
 
         // Required properties

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
index 6ecfa59..ffff2a3 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
@@ -45,7 +45,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -217,7 +216,7 @@ public class FetchParquetTest {
         configure(proc);
 
         final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class);
-        when(recordSetWriter.write(any(RecordSet.class))).thenThrow(new IOException("IOException"));
+        when(recordSetWriter.write(any(Record.class))).thenThrow(new IOException("IOException"));
 
         final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class);
         when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory");

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
index 3a07dce..e634e2e 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
@@ -16,12 +16,25 @@
  */
 package org.apache.nifi.processors.parquet;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.BasicConfigurator;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -32,7 +45,6 @@ import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
@@ -49,21 +61,10 @@ import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.when;
-
 
 public class PutParquetTest {
 
@@ -76,6 +77,10 @@ public class PutParquetTest {
     private MockRecordParser readerFactory;
     private TestRunner testRunner;
 
+    @BeforeClass
+    public static void setupLogging() {
+        BasicConfigurator.configure();
+    }
 
     @Before
     public void setup() throws IOException, InitializationException {
@@ -108,8 +113,6 @@ public class PutParquetTest {
         testRunner.enableControllerService(readerFactory);
 
         testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory");
-        testRunner.setProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
-        testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, schema.toString());
     }
 
     @Test
@@ -325,7 +328,6 @@ public class PutParquetTest {
     @Test
     public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException, IOException {
         configure(proc, 10);
-        testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
 
         final String filename = "testValidSchemaWithELShouldBeSuccessful-" + System.currentTimeMillis();
 
@@ -340,39 +342,6 @@ public class PutParquetTest {
     }
 
     @Test
-    public void testSchemaWithELMissingShouldRouteToFailure() throws InitializationException, IOException {
-        configure(proc, 10);
-        testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
-
-        final String filename = "testSchemaWithELMissingShouldRouteToFailure-" + System.currentTimeMillis();
-
-        // don't provide my.schema as an attribute
-        final Map<String,String> flowFileAttributes = new HashMap<>();
-        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
-
-        testRunner.enqueue("trigger", flowFileAttributes);
-        testRunner.run();
-        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
-    }
-
-    @Test
-    public void testInvalidSchemaShouldRouteToFailure() throws InitializationException, IOException {
-        configure(proc, 10);
-        testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
-
-        final String filename = "testInvalidSchemaShouldRouteToFailure-" + System.currentTimeMillis();
-
-        // don't provide my.schema as an attribute
-        final Map<String,String> flowFileAttributes = new HashMap<>();
-        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
-        flowFileAttributes.put("my.schema", "NOT A SCHEMA");
-
-        testRunner.enqueue("trigger", flowFileAttributes);
-        testRunner.run();
-        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
-    }
-
-    @Test
     public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
         configure(proc, 10);
 
@@ -427,6 +396,7 @@ public class PutParquetTest {
 
         final RecordReader recordReader = Mockito.mock(RecordReader.class);
         when(recordReader.createRecordSet()).thenReturn(recordSet);
+        when(recordReader.getSchema()).thenReturn(AvroTypeUtil.createSchema(schema));
 
         final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class);
         when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");