You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/01 20:12:01 UTC

[08/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet - Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - R

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
new file mode 100644
index 0000000..8d4f875
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put.sender;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Sends messages over a SocketChannel.
+ */
+public class SocketChannelSender extends ChannelSender {
+
+    protected SocketChannel channel;
+    protected SocketChannelOutputStream socketChannelOutput;
+
+    public SocketChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
+        super(host, port, maxSendBufferSize, logger);
+    }
+
+    @Override
+    public void open() throws IOException {
+        if (channel == null) {
+            channel = SocketChannel.open();
+            channel.configureBlocking(false);
+
+            if (maxSendBufferSize > 0) {
+                channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
+                final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
+                if (actualSendBufSize < maxSendBufferSize) {
+                    logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
+                            + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
+                            + "consider changing the Operating System's maximum send buffer");
+                }
+            }
+        }
+
+        if (!channel.isConnected()) {
+            final long startTime = System.currentTimeMillis();
+            final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(host), port);
+
+            if (!channel.connect(socketAddress)) {
+                while (!channel.finishConnect()) {
+                    if (System.currentTimeMillis() > startTime + timeout) {
+                        throw new SocketTimeoutException("Timed out connecting to " + host + ":" + port);
+                    }
+
+                    try {
+                        Thread.sleep(50L);
+                    } catch (final InterruptedException e) {
+                    }
+                }
+            }
+
+            socketChannelOutput = new SocketChannelOutputStream(channel);
+            socketChannelOutput.setTimeout(timeout);
+        }
+    }
+
+    @Override
+    protected void write(byte[] data) throws IOException {
+        socketChannelOutput.write(data);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return channel != null && channel.isConnected();
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(socketChannelOutput);
+        IOUtils.closeQuietly(channel);
+        socketChannelOutput = null;
+        channel = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
new file mode 100644
index 0000000..bd73379
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.processor.exception.ProcessException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestExceptionHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestExceptionHandler.class);
+
+    /**
+     * Simulate an external procedure.
+     */
+    static class ExternalProcedure {
+        private boolean available = true;
+        int divide(Integer a, Integer b) throws Exception {
+            if (!available) {
+                throw new IOException("Not available");
+            }
+            if (a == 10) {
+                throw new IllegalStateException("Service for 10 is not currently available.");
+            }
+            return a / b;
+        }
+    }
+
+    private class Context {
+        int count = 0;
+    }
+
+    @Test
+    public void testBasicUsage() {
+
+        final ExternalProcedure p = new ExternalProcedure();
+
+        try {
+            // Although a catch-exception has to be caught each possible call,
+            // usually the error handling logic will be the same.
+            // Ends up having a lot of same code.
+            final int r1 = p.divide(4, 2);
+            assertEquals(2, r1);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        final Context context = new Context();
+        final ExceptionHandler<Context> handler = new ExceptionHandler<>();
+
+        // Using handler can avoid the try catch block with reusable error handling logic.
+        handler.execute(context, 6, i -> {
+            final int r2 = p.divide(i, 2);
+            assertEquals(3, r2);
+        });
+
+        // If return value is needed, use AtomicReference.
+        AtomicReference<Integer> r = new AtomicReference<>();
+        handler.execute(context, 8, i -> r.set(p.divide(i, 2)));
+        assertEquals(4, r.get().intValue());
+
+        // If no exception mapping is specified, any Exception thrown is wrapped by ProcessException.
+        try {
+            final Integer nullInput = null;
+            handler.execute(context, nullInput, i -> r.set(p.divide(i, 2)));
+            fail("Exception should be thrown because input is null.");
+        } catch (ProcessException e) {
+            assertTrue(e.getCause() instanceof NullPointerException);
+        }
+    }
+
+    // Reusable Exception mapping function.
+    static Function<Exception, ErrorTypes> exceptionMapping = i -> {
+        try {
+            throw i;
+        } catch (NullPointerException | ArithmeticException | NumberFormatException e) {
+            return ErrorTypes.InvalidInput;
+        } catch (IllegalStateException e) {
+            return ErrorTypes.TemporalInputFailure;
+        } catch (IOException e) {
+            return ErrorTypes.TemporalFailure;
+        } catch (Exception e) {
+            throw new ProcessException(e);
+        }
+    };
+
+    @Test
+    public void testHandling() {
+
+        final ExternalProcedure p = new ExternalProcedure();
+        final Context context = new Context();
+
+        final ExceptionHandler<Context> handler = new ExceptionHandler<>();
+        handler.mapException(exceptionMapping);
+        handler.onError(createInputErrorHandler());
+
+        // Benefit of handler is being able to externalize error handling, make it simpler.
+        handler.execute(context, 4, i -> {
+            final int r = p.divide(i, 2);
+            assertEquals(2, r);
+        });
+
+        // Null pointer exception.
+        final Integer input = null;
+        handler.execute(context, input, i -> {
+            p.divide(i, 2);
+            fail("Shouldn't reach here.");
+        });
+
+        // Divide by zero.
+        handler.execute(context, 0, i -> {
+            p.divide(2, i);
+            fail("Shouldn't reach here.");
+        });
+
+
+    }
+
+    static <C> ExceptionHandler.OnError<C, Integer> createInputErrorHandler() {
+        return (c, i, r, e) -> {
+            switch (r.destination()) {
+                case ProcessException:
+                    throw new ProcessException(String.format("Execution failed due to %s", e), e);
+                default:
+                    logger.warn(String.format("Routing to %s: %d caused %s", r, i, e));
+            }
+        };
+    }
+
+    static <C> ExceptionHandler.OnError<C, Integer[]> createArrayInputErrorHandler() {
+        return (c, i, r, e) -> {
+            switch (r.destination()) {
+                case ProcessException:
+                    throw new ProcessException(String.format("Execution failed due to %s", e), e);
+                default:
+                    logger.warn(String.format("Routing to %s: %d, %d caused %s", r, i[0], i[1], e));
+            }
+        };
+    }
+
+    @Test
+    public void testHandlingLoop() {
+
+        final ExternalProcedure p = new ExternalProcedure();
+        final Context context = new Context();
+
+        final ExceptionHandler<Context> handler = new ExceptionHandler<>();
+        handler.mapException(exceptionMapping);
+        handler.onError(createArrayInputErrorHandler());
+
+        // It's especially handy when looping through inputs. [a, b, expected result]
+        Integer[][] inputs = new Integer[][]{{4, 2, 2}, {null, 2, 999}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
+
+        Arrays.stream(inputs).forEach(input ->  handler.execute(context, input, (in) -> {
+            final Integer r = p.divide(in[0], in[1]);
+            // This is safe because if p.divide throws error, this code won't be executed.
+            assertEquals(in[2], r);
+        }));
+
+        AtomicReference<Integer> r = new AtomicReference<>();
+        for (Integer[] input : inputs) {
+
+            if (!handler.execute(context, input, (in) -> {
+                r.set(p.divide(in[0], in[1]));
+                context.count++;
+            })){
+                // Handler returns false when it fails.
+                // Cleaner if-exception-continue-next-input can be written cleaner.
+                continue;
+            }
+
+            assertEquals(input[2], r.get());
+        }
+
+        assertEquals("Successful inputs", 2, context.count);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
new file mode 100644
index 0000000..6d73759
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.pattern.TestExceptionHandler.ExternalProcedure;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.createArrayInputErrorHandler;
+import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.exceptionMapping;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestRollbackOnFailure {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestRollbackOnFailure.class);
+
+    /**
+     * This can be an example for how to compose an ExceptionHandler instance by reusable functions.
+     * @param logger used to log messages within functions
+     * @return a composed ExceptionHandler
+     */
+    private ExceptionHandler<RollbackOnFailure> getContextAwareExceptionHandler(ComponentLog logger) {
+        final ExceptionHandler<RollbackOnFailure> handler = new ExceptionHandler<>();
+        handler.mapException(exceptionMapping);
+        handler.adjustError(RollbackOnFailure.createAdjustError(logger));
+        handler.onError(createArrayInputErrorHandler());
+        return handler;
+    }
+
+    private void processInputs(RollbackOnFailure context, Integer[][] inputs, List<Integer> results) {
+        final ExternalProcedure p = new ExternalProcedure();
+        final MockComponentLog componentLog = new MockComponentLog("processor-id", this);
+        final ExceptionHandler<RollbackOnFailure> handler = getContextAwareExceptionHandler(componentLog);
+
+        for (Integer[] input : inputs) {
+
+            if (!handler.execute(context, input, (in) -> {
+                results.add(p.divide(in[0], in[1]));
+                context.proceed();
+            })){
+                continue;
+            }
+
+            assertEquals(input[2], results.get(results.size() - 1));
+        }
+    }
+
+    @Test
+    public void testContextDefaultBehavior() {
+
+        // Disabling rollbackOnFailure would route Failure or Retry as they are.
+        final RollbackOnFailure context = new RollbackOnFailure(false, false);
+
+        Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
+
+        final List<Integer> results = new ArrayList<>();
+        try {
+            processInputs(context, inputs, results);
+        } catch (ProcessException e) {
+            fail("ProcessException should NOT be thrown");
+        }
+
+        assertEquals("Successful inputs", 2, context.getProcessedCount());
+    }
+
+    @Test
+    public void testContextRollbackOnFailureNonTransactionalFirstFailure() {
+
+        final RollbackOnFailure context = new RollbackOnFailure(true, false);
+
+        // If the first execution fails without any succeeded inputs, it should throw a ProcessException.
+        Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
+
+        final List<Integer> results = new ArrayList<>();
+        try {
+            processInputs(context, inputs, results);
+            fail("ProcessException should be thrown");
+        } catch (ProcessException e) {
+            logger.info("Exception was thrown as expected.");
+        }
+
+        assertEquals("Successful inputs", 0, context.getProcessedCount());
+    }
+
+    @Test
+    public void testContextRollbackOnFailureNonTransactionalAlreadySucceeded() {
+
+        final RollbackOnFailure context = new RollbackOnFailure(true, false);
+
+        // If an execution fails after succeeded inputs, it transfer the input to Failure instead of ProcessException,
+        // and keep going. Because the external system does not support transaction.
+        Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}};
+
+        final List<Integer> results = new ArrayList<>();
+        try {
+            processInputs(context, inputs, results);
+        } catch (ProcessException e) {
+            fail("ProcessException should NOT be thrown");
+        }
+
+        assertEquals("Successful inputs", 2, context.getProcessedCount());
+    }
+
+    @Test
+    public void testContextRollbackOnFailureTransactionalAlreadySucceeded() {
+
+        final RollbackOnFailure context = new RollbackOnFailure(true, true);
+
+        // Even if an execution fails after succeeded inputs, it transfer the input to Failure,
+        // because the external system supports transaction.
+        Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}};
+
+        final List<Integer> results = new ArrayList<>();
+        try {
+            processInputs(context, inputs, results);
+            fail("ProcessException should be thrown");
+        } catch (ProcessException e) {
+            logger.info("Exception was thrown as expected.");
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
new file mode 100644
index 0000000..dbb9318
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-record-utils</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>nifi-avro-record-utils</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+        <!-- Other modules using nifi-avro-record-utils are expected to have these APIs available, typically through a NAR dependency -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
new file mode 100644
index 0000000..4449afc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+public class AvroSchemaValidator implements Validator {
+
+    @Override
+    public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(true)
+                .explanation("Expression Language is present")
+                .build();
+        }
+
+        try {
+            new Schema.Parser().parse(input);
+
+            return new ValidationResult.Builder()
+                .valid(true)
+                .build();
+        } catch (final Exception e) {
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(false)
+                .explanation("Not a valid Avro Schema: " + e.getMessage())
+                .build();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
new file mode 100644
index 0000000..3af368e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class AvroTypeUtil {
+    public static final String AVRO_SCHEMA_FORMAT = "avro";
+
+    public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException {
+        if (recordSchema == null) {
+            throw new IllegalArgumentException("RecordSchema cannot be null");
+        }
+
+        final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat();
+        if (!schemaFormatOption.isPresent()) {
+            throw new SchemaNotFoundException("No Schema Format was present in the RecordSchema");
+        }
+
+        final String schemaFormat = schemaFormatOption.get();
+        if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) {
+            throw new SchemaNotFoundException("Schema provided is not in Avro format");
+        }
+
+        final Optional<String> textOption = recordSchema.getSchemaText();
+        if (!textOption.isPresent()) {
+            throw new SchemaNotFoundException("No Schema text was present in the RecordSchema");
+        }
+
+        final String text = textOption.get();
+        return new Schema.Parser().parse(text);
+    }
+
+    public static DataType determineDataType(final Schema avroSchema) {
+        final Type avroType = avroSchema.getType();
+
+        switch (avroType) {
+            case BYTES:
+            case FIXED:
+                return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+            case ARRAY:
+                final DataType elementType = determineDataType(avroSchema.getElementType());
+                return RecordFieldType.ARRAY.getArrayDataType(elementType);
+            case BOOLEAN:
+                return RecordFieldType.BOOLEAN.getDataType();
+            case DOUBLE:
+                return RecordFieldType.DOUBLE.getDataType();
+            case ENUM:
+            case STRING:
+                return RecordFieldType.STRING.getDataType();
+            case FLOAT:
+                return RecordFieldType.FLOAT.getDataType();
+            case INT: {
+                final LogicalType logicalType = avroSchema.getLogicalType();
+                if (logicalType == null) {
+                    return RecordFieldType.INT.getDataType();
+                }
+
+                if (LogicalTypes.date().getName().equals(logicalType.getName())) {
+                    return RecordFieldType.DATE.getDataType();
+                } else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
+                    return RecordFieldType.TIME.getDataType();
+                }
+
+                return RecordFieldType.INT.getDataType();
+            }
+            case LONG: {
+                final LogicalType logicalType = avroSchema.getLogicalType();
+                if (logicalType == null) {
+                    return RecordFieldType.LONG.getDataType();
+                }
+
+                if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
+                    return RecordFieldType.TIMESTAMP.getDataType();
+                } else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
+                    return RecordFieldType.TIMESTAMP.getDataType();
+                } else if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
+                    return RecordFieldType.TIME.getDataType();
+                }
+
+                return RecordFieldType.LONG.getDataType();
+            }
+            case RECORD: {
+                final List<Field> avroFields = avroSchema.getFields();
+                final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
+
+                for (final Field field : avroFields) {
+                    final String fieldName = field.name();
+                    final Schema fieldSchema = field.schema();
+                    final DataType fieldType = determineDataType(fieldSchema);
+                    recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
+                }
+
+                final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
+                return RecordFieldType.RECORD.getRecordDataType(recordSchema);
+            }
+            case NULL:
+                return RecordFieldType.STRING.getDataType();
+            case MAP:
+                final Schema valueSchema = avroSchema.getValueType();
+                final DataType valueType = determineDataType(valueSchema);
+                return RecordFieldType.MAP.getMapDataType(valueType);
+            case UNION: {
+                final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream()
+                    .filter(s -> s.getType() != Type.NULL)
+                    .collect(Collectors.toList());
+
+                if (nonNullSubSchemas.size() == 1) {
+                    return determineDataType(nonNullSubSchemas.get(0));
+                }
+
+                final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
+                for (final Schema subSchema : nonNullSubSchemas) {
+                    final DataType childDataType = determineDataType(subSchema);
+                    possibleChildTypes.add(childDataType);
+                }
+
+                return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
+            }
+        }
+
+        return null;
+    }
+
+    public static RecordSchema createSchema(final Schema avroSchema) {
+        if (avroSchema == null) {
+            throw new IllegalArgumentException("Avro Schema cannot be null");
+        }
+
+        final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
+        for (final Field field : avroSchema.getFields()) {
+            final String fieldName = field.name();
+            final DataType dataType = AvroTypeUtil.determineDataType(field.schema());
+
+            recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases()));
+        }
+
+        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
+        return recordSchema;
+    }
+
+    public static Object[] convertByteArray(final byte[] bytes) {
+        final Object[] array = new Object[bytes.length];
+        for (int i = 0; i < bytes.length; i++) {
+            array[i] = Byte.valueOf(bytes[i]);
+        }
+        return array;
+    }
+
+    public static ByteBuffer convertByteArray(final Object[] bytes) {
+        final ByteBuffer bb = ByteBuffer.allocate(bytes.length);
+        for (final Object o : bytes) {
+            if (o instanceof Byte) {
+                bb.put(((Byte) o).byteValue());
+            } else {
+                throw new IllegalTypeConversionException("Cannot convert value " + bytes + " of type " + bytes.getClass() + " to ByteBuffer");
+            }
+        }
+        bb.flip();
+        return bb;
+    }
+
+    public static GenericRecord createAvroRecord(final Record record, final Schema avroSchema) throws IOException {
+        final GenericRecord rec = new GenericData.Record(avroSchema);
+        final RecordSchema recordSchema = record.getSchema();
+
+        for (final RecordField recordField : recordSchema.getFields()) {
+            final Object rawValue = record.getValue(recordField);
+            final String fieldName = recordField.getFieldName();
+
+            final Field field = avroSchema.getField(fieldName);
+            if (field == null) {
+                continue;
+            }
+
+            final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName);
+            rec.put(fieldName, converted);
+        }
+
+        return rec;
+    }
+
+    private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) throws IOException {
+        if (rawValue == null) {
+            return null;
+        }
+
+        switch (fieldSchema.getType()) {
+            case INT: {
+                final LogicalType logicalType = fieldSchema.getLogicalType();
+                if (logicalType == null) {
+                    return DataTypeUtils.toInteger(rawValue, fieldName);
+                }
+
+                if (LogicalTypes.date().getName().equals(logicalType.getName())) {
+                    final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
+                    final Date date = new Date(longValue);
+                    final Duration duration = Duration.between(new Date(0L).toInstant(), date.toInstant());
+                    final long days = duration.toDays();
+                    return (int) days;
+                } else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
+                    final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
+                    final Date date = new Date(longValue);
+                    final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant());
+                    final long millisSinceMidnight = duration.toMillis();
+                    return (int) millisSinceMidnight;
+                }
+
+                return DataTypeUtils.toInteger(rawValue, fieldName);
+            }
+            case LONG: {
+                final LogicalType logicalType = fieldSchema.getLogicalType();
+                if (logicalType == null) {
+                    return DataTypeUtils.toLong(rawValue, fieldName);
+                }
+
+                if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
+                    final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
+                    final Date date = new Date(longValue);
+                    final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant());
+                    return duration.toMillis() * 1000L;
+                } else if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
+                    return DataTypeUtils.toLong(rawValue, fieldName);
+                } else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
+                    return DataTypeUtils.toLong(rawValue, fieldName) * 1000L;
+                }
+
+                return DataTypeUtils.toLong(rawValue, fieldName);
+            }
+            case BYTES:
+            case FIXED:
+                if (rawValue instanceof byte[]) {
+                    return ByteBuffer.wrap((byte[]) rawValue);
+                }
+                if (rawValue instanceof Object[]) {
+                    return AvroTypeUtil.convertByteArray((Object[]) rawValue);
+                } else {
+                    throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
+                }
+            case MAP:
+                if (rawValue instanceof Record) {
+                    final Record recordValue = (Record) rawValue;
+                    final Map<String, Object> map = new HashMap<>();
+                    for (final RecordField recordField : recordValue.getSchema().getFields()) {
+                        final Object v = recordValue.getValue(recordField);
+                        if (v != null) {
+                            map.put(recordField.getFieldName(), v);
+                        }
+                    }
+
+                    return map;
+                } else {
+                    throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map");
+                }
+            case RECORD:
+                final GenericData.Record avroRecord = new GenericData.Record(fieldSchema);
+
+                final Record record = (Record) rawValue;
+                for (final RecordField recordField : record.getSchema().getFields()) {
+                    final Object recordFieldValue = record.getValue(recordField);
+                    final String recordFieldName = recordField.getFieldName();
+
+                    final Field field = fieldSchema.getField(recordFieldName);
+                    if (field == null) {
+                        continue;
+                    }
+
+                    final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName);
+                    avroRecord.put(recordFieldName, converted);
+                }
+                return avroRecord;
+            case UNION:
+                List<Schema> unionFieldSchemas = fieldSchema.getTypes();
+                if (unionFieldSchemas != null) {
+                    // Ignore null types in union
+                    final List<Schema> nonNullFieldSchemas = unionFieldSchemas.stream()
+                            .filter(s -> s.getType() != Type.NULL)
+                            .collect(Collectors.toList());
+
+                    // If at least one non-null type exists, find the first compatible type
+                    if (nonNullFieldSchemas.size() >= 1) {
+                        for (final Schema nonNullFieldSchema : nonNullFieldSchemas) {
+                            final Object avroObject = convertToAvroObject(rawValue, nonNullFieldSchema, fieldName);
+                            final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema);
+                            if (DataTypeUtils.isCompatibleDataType(avroObject, desiredDataType)) {
+                                return avroObject;
+                            }
+                        }
+
+                        throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass()
+                                + " because no compatible types exist in the UNION");
+                    }
+                }
+                return null;
+            case ARRAY:
+                final Object[] objectArray = (Object[]) rawValue;
+                final List<Object> list = new ArrayList<>(objectArray.length);
+                for (final Object o : objectArray) {
+                    final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName);
+                    list.add(converted);
+                }
+                return list;
+            case BOOLEAN:
+                return DataTypeUtils.toBoolean(rawValue, fieldName);
+            case DOUBLE:
+                return DataTypeUtils.toDouble(rawValue, fieldName);
+            case FLOAT:
+                return DataTypeUtils.toFloat(rawValue, fieldName);
+            case NULL:
+                return null;
+            case ENUM:
+                return new GenericData.EnumSymbol(fieldSchema, rawValue);
+            case STRING:
+                return DataTypeUtils.toString(rawValue, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
+        }
+
+        return rawValue;
+    }
+
+    public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema) {
+        final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
+
+        for (final RecordField recordField : recordSchema.getFields()) {
+            Object value = avroRecord.get(recordField.getFieldName());
+            if (value == null) {
+                for (final String alias : recordField.getAliases()) {
+                    value = avroRecord.get(alias);
+                    if (value != null) {
+                        break;
+                    }
+                }
+            }
+
+            final String fieldName = recordField.getFieldName();
+            final Field avroField = avroRecord.getSchema().getField(fieldName);
+            if (avroField == null) {
+                values.put(fieldName, null);
+                continue;
+            }
+
+            final Schema fieldSchema = avroField.schema();
+            final Object rawValue = normalizeValue(value, fieldSchema);
+
+            final DataType desiredType = recordField.getDataType();
+            final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName);
+
+            values.put(fieldName, coercedValue);
+        }
+
+        return values;
+    }
+
+    private static Object normalizeValue(final Object value, final Schema avroSchema) {
+        if (value == null) {
+            return null;
+        }
+
+        switch (avroSchema.getType()) {
+            case INT: {
+                final LogicalType logicalType = avroSchema.getLogicalType();
+                if (logicalType == null) {
+                    return value;
+                }
+
+                final String logicalName = logicalType.getName();
+                if (LogicalTypes.date().getName().equals(logicalName)) {
+                    // date logical name means that the value is number of days since Jan 1, 1970
+                    return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value));
+                } else if (LogicalTypes.timeMillis().equals(logicalName)) {
+                    // time-millis logical name means that the value is number of milliseconds since midnight.
+                    return new java.sql.Time((int) value);
+                }
+
+                break;
+            }
+            case LONG: {
+                final LogicalType logicalType = avroSchema.getLogicalType();
+                if (logicalType == null) {
+                    return value;
+                }
+
+                final String logicalName = logicalType.getName();
+                if (LogicalTypes.timeMicros().getName().equals(logicalName)) {
+                    return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value));
+                } else if (LogicalTypes.timestampMillis().getName().equals(logicalName)) {
+                    return new java.sql.Timestamp((long) value);
+                } else if (LogicalTypes.timestampMicros().getName().equals(logicalName)) {
+                    return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value));
+                }
+                break;
+            }
+            case UNION:
+                if (value instanceof GenericData.Record) {
+                    final GenericData.Record avroRecord = (GenericData.Record) value;
+                    return normalizeValue(value, avroRecord.getSchema());
+                }
+                break;
+            case RECORD:
+                final GenericData.Record record = (GenericData.Record) value;
+                final Schema recordSchema = record.getSchema();
+                final List<Field> recordFields = recordSchema.getFields();
+                final Map<String, Object> values = new HashMap<>(recordFields.size());
+                for (final Field field : recordFields) {
+                    final Object avroFieldValue = record.get(field.name());
+                    final Object fieldValue = normalizeValue(avroFieldValue, field.schema());
+                    values.put(field.name(), fieldValue);
+                }
+                final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema);
+                return new MapRecord(childSchema, values);
+            case BYTES:
+                final ByteBuffer bb = (ByteBuffer) value;
+                return AvroTypeUtil.convertByteArray(bb.array());
+            case FIXED:
+                final GenericFixed fixed = (GenericFixed) value;
+                return AvroTypeUtil.convertByteArray(fixed.bytes());
+            case ENUM:
+                return value.toString();
+            case NULL:
+                return null;
+            case STRING:
+                return value.toString();
+            case ARRAY:
+                final GenericData.Array<?> array = (GenericData.Array<?>) value;
+                final Object[] valueArray = new Object[array.size()];
+                for (int i = 0; i < array.size(); i++) {
+                    final Schema elementSchema = avroSchema.getElementType();
+                    valueArray[i] = normalizeValue(array.get(i), elementSchema);
+                }
+                return valueArray;
+            case MAP:
+                final Map<?, ?> avroMap = (Map<?, ?>) value;
+                final Map<String, Object> map = new HashMap<>(avroMap.size());
+                for (final Map.Entry<?, ?> entry : avroMap.entrySet()) {
+                    Object obj = entry.getValue();
+                    if (obj instanceof Utf8 || obj instanceof CharSequence) {
+                        obj = obj.toString();
+                    }
+
+                    final String key = entry.getKey().toString();
+                    obj = normalizeValue(obj, avroSchema.getValueType());
+
+                    map.put(key, obj);
+                }
+
+                final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType());
+                final List<RecordField> mapFields = new ArrayList<>();
+                for (final String key : map.keySet()) {
+                    mapFields.add(new RecordField(key, elementType));
+                }
+                final RecordSchema mapSchema = new SimpleRecordSchema(mapFields);
+                return new MapRecord(mapSchema, map);
+        }
+
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
new file mode 100644
index 0000000..3155909
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.Set;
+
+public class AvroSchemaTextStrategy implements SchemaAccessStrategy {
+    private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
+
+    private static final Logger logger = LoggerFactory.getLogger(AvroSchemaTextStrategy.class);
+    private final PropertyValue schemaTextPropertyValue;
+
+    public AvroSchemaTextStrategy(final PropertyValue schemaTextPropertyValue) {
+        this.schemaTextPropertyValue = schemaTextPropertyValue;
+    }
+
+    @Override
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+        final String schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue();
+        if (schemaText == null || schemaText.trim().isEmpty()) {
+            throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Text");
+        }
+
+        logger.debug("For {} found schema text {}", flowFile, schemaText);
+
+        try {
+            final Schema avroSchema = new Schema.Parser().parse(schemaText);
+            return AvroTypeUtil.createSchema(avroSchema);
+        } catch (final Exception e) {
+            throw new SchemaNotFoundException("Failed to create schema from the Schema Text after evaluating FlowFile Attributes", e);
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
new file mode 100644
index 0000000..cab9c02
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.avro.AvroSchemaValidator;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class SchemaAccessUtils {
+
+    public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
+            "The name of the Schema to use is specified by the 'Schema Name' Property. The value of this property is used to lookup the Schema in the configured Schema Registry service.");
+    public static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use 'Schema Text' Property",
+            "The text of the Schema itself is specified by the 'Schema Text' Property. The value of this property must be a valid Avro Schema. "
+                    + "If Expression Language is used, the value of the 'Schema Text' property must be valid after substituting the expressions.");
+    public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
+            "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
+                    + "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
+                    + "found at https://github.com/hortonworks/registry");
+    public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
+            "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
+
+    public  static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
+            .name("schema-registry")
+            .displayName("Schema Registry")
+            .description("Specifies the Controller Service to use for the Schema Registry")
+            .identifiesControllerService(SchemaRegistry.class)
+            .required(false)
+            .build();
+
+    public  static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
+            .name("schema-access-strategy")
+            .displayName("Schema Access Strategy")
+            .description("Specifies how to obtain the schema that is to be used for interpreting the data.")
+            .allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
+            .defaultValue(SCHEMA_NAME_PROPERTY.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+            .name("schema-name")
+            .displayName("Schema Name")
+            .description("Specifies the name of the schema to lookup in the Schema Registry property")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("${schema.name}")
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
+            .name("schema-text")
+            .displayName("Schema Text")
+            .description("The text of an Avro-formatted Schema")
+            .addValidator(new AvroSchemaValidator())
+            .expressionLanguageSupported(true)
+            .defaultValue("${avro.schema}")
+            .required(false)
+            .build();
+
+    public static Collection<ValidationResult> validateSchemaAccessStrategy(final ValidationContext validationContext, final String schemaAccessStrategyValue,
+                                                                            final List<AllowableValue> schemaAccessStrategyValues) {
+        if (isSchemaRegistryRequired(schemaAccessStrategyValue)) {
+            final boolean registrySet = validationContext.getProperty(SCHEMA_REGISTRY).isSet();
+            if (!registrySet) {
+                final String schemaAccessStrategyName = getSchemaAccessStrategyName(schemaAccessStrategyValue, schemaAccessStrategyValues);
+
+                return Collections.singleton(new ValidationResult.Builder()
+                        .subject("Schema Registry")
+                        .explanation("The '" + schemaAccessStrategyName + "' Schema Access Strategy requires that the Schema Registry property be set.")
+                        .valid(false)
+                        .build());
+            }
+        }
+
+        return Collections.emptyList();
+    }
+
+    private static String getSchemaAccessStrategyName(final String schemaAccessValue, final List<AllowableValue> schemaAccessStrategyValues) {
+        for (final AllowableValue allowableValue : schemaAccessStrategyValues) {
+            if (allowableValue.getValue().equalsIgnoreCase(schemaAccessValue)) {
+                return allowableValue.getDisplayName();
+            }
+        }
+
+        return null;
+    }
+
+    private static boolean isSchemaRegistryRequired(final String schemaAccessValue) {
+        return HWX_CONTENT_ENCODED_SCHEMA.getValue().equalsIgnoreCase(schemaAccessValue) || SCHEMA_NAME_PROPERTY.getValue().equalsIgnoreCase(schemaAccessValue)
+                || HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue);
+    }
+
+    public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ProcessContext context) {
+        if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
+            return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
+        } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
+            return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
+        } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
+            return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
+        } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
+            return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+        }
+
+        return null;
+    }
+
+    public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
+        if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
+            return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
+        } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
+            return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
+        } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
+            return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
+        } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
+            return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+        }
+
+        return null;
+    }
+
+    public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
+        if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
+            return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
+        } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
+            return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
+        } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
+            return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
+        } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
+            return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+        }
+
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/pom.xml
new file mode 100644
index 0000000..5b5ddaf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-record-utils</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-hadoop-record-utils</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+        </dependency>
+        <!-- Other modules using nifi-hadoop-utils are expected to have the below dependencies available, typically through a NAR dependency -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
new file mode 100644
index 0000000..9f4f5ac
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Base processor for reading a data from HDFS that can be fetched into records.
+ */
+@TriggerWhenEmpty // trigger when empty so we have a chance to perform a Kerberos re-login
+@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since we are triggering when empty
+public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
+
+    public static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+            .name("filename")
+            .displayName("Filename")
+            .description("The name of the file to retrieve")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .defaultValue("${path}/${filename}")
+            .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The service for writing records to the FlowFile content")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles will be routed to this relationship once they have been updated with the content of the file")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles will be routed to this relationship if the content of the file cannot be retrieved and trying again will likely not be helpful. "
+                    + "This would occur, for instance, if the file is not found or if there is a permissions issue")
+            .build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("FlowFiles will be routed to this relationship if the content of the file cannot be retrieved, but might be able to be in the future if tried again. "
+                    + "This generally indicates that the Fetch should be tried again.")
+            .build();
+
+    public static final String FETCH_FAILURE_REASON_ATTR = "fetch.failure.reason";
+    public static final String RECORD_COUNT_ATTR = "record.count";
+
+    private volatile Set<Relationship> fetchHdfsRecordRelationships;
+    private volatile List<PropertyDescriptor> fetchHdfsRecordProperties;
+
+    @Override
+    protected final void init(final ProcessorInitializationContext context) {
+        super.init(context);
+
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_RETRY);
+        rels.add(REL_FAILURE);
+        this.fetchHdfsRecordRelationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> props = new ArrayList<>(properties);
+        props.add(FILENAME);
+        props.add(RECORD_WRITER);
+        props.addAll(getAdditionalProperties());
+        this.fetchHdfsRecordProperties = Collections.unmodifiableList(props);
+    }
+
+    /**
+     * Allows sub-classes to add additional properties, called from initialize.
+     *
+     * @return additional properties to add to the overall list
+     */
+    public List<PropertyDescriptor> getAdditionalProperties() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public final Set<Relationship> getRelationships() {
+        return fetchHdfsRecordRelationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return fetchHdfsRecordProperties;
+    }
+
+    /**
+     * Sub-classes provide the appropriate HDFSRecordReader.
+     *
+     * @param context the process context to obtain additional configuration
+     * @param flowFile the flow file being written
+     * @param conf the Configuration instance
+     * @param path the path to write to
+     * @return the HDFSRecordWriter
+     * @throws IOException if an error occurs creating the writer
+     */
+    public abstract HDFSRecordReader createHDFSRecordReader(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path)
+            throws IOException;
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        // do this before getting a flow file so that we always get a chance to attempt Kerberos relogin
+        final FileSystem fileSystem = getFileSystem();
+        final Configuration configuration = getConfiguration();
+        final UserGroupInformation ugi = getUserGroupInformation();
+
+        if (configuration == null || fileSystem == null || ugi == null) {
+            getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
+            context.yield();
+            return;
+        }
+
+        final FlowFile originalFlowFile = session.get();
+        if (originalFlowFile == null ) {
+            context.yield();
+            return;
+        }
+
+
+        ugi.doAs((PrivilegedAction<Object>)() -> {
+            FlowFile child = null;
+            final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
+            try {
+                final Path path = new Path(filenameValue);
+                final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
+                final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
+
+                final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+                final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), originalFlowFile, new NullInputStream(0));
+
+                final StopWatch stopWatch = new StopWatch(true);
+
+                // use a child FlowFile so that if any error occurs we can route the original untouched FlowFile to retry/failure
+                child = session.create(originalFlowFile);
+                child = session.write(child, (final OutputStream rawOut) -> {
+                    try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
+                         final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
+
+                        final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList());
+
+                        final RecordSet recordSet = new RecordSet() {
+                            @Override
+                            public RecordSchema getSchema() throws IOException {
+                                return emptySchema;
+                            }
+
+                            @Override
+                            public Record next() throws IOException {
+                                return recordReader.nextRecord();
+                            }
+                        };
+
+                        writeResult.set(recordSetWriter.write(recordSet, out));
+                    } catch (Exception e) {
+                        exceptionHolder.set(e);
+                    }
+                });
+
+                stopWatch.stop();
+
+                // if any errors happened within the session.write then throw the exception so we jump
+                // into one of the appropriate catch blocks below
+                if (exceptionHolder.get() != null) {
+                    throw exceptionHolder.get();
+                }
+
+                FlowFile successFlowFile = postProcess(context, session, child, path);
+
+                final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
+                attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
+                successFlowFile = session.putAllAttributes(successFlowFile, attributes);
+
+                final URI uri = path.toUri();
+                getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {uri, successFlowFile, stopWatch.getDuration()});
+                session.getProvenanceReporter().fetch(successFlowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
+                session.transfer(successFlowFile, REL_SUCCESS);
+                session.remove(originalFlowFile);
+                return null;
+
+            } catch (final FileNotFoundException | AccessControlException e) {
+                getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, e});
+                final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage());
+                session.transfer(failureFlowFile, REL_FAILURE);
+            } catch (final IOException | FlowFileAccessException e) {
+                getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to retry", new Object[] {filenameValue, originalFlowFile, e});
+                session.transfer(session.penalize(originalFlowFile), REL_RETRY);
+                context.yield();
+            } catch (final Throwable t) {
+                getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, t});
+                final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage());
+                session.transfer(failureFlowFile, REL_FAILURE);
+            }
+
+            // if we got this far then we weren't successful so we need to clean up the child flow file if it got initialized
+            if (child != null) {
+                session.remove(child);
+            }
+
+            return null;
+        });
+
+    }
+
+    /**
+     * This method will be called after successfully writing to the destination file and renaming the file to it's final name
+     * in order to give sub-classes a chance to take action before transferring to success.
+     *
+     * @param context the context
+     * @param session the session
+     * @param flowFile the flow file being processed
+     * @param fetchPath the path that was fetched
+     * @return an updated FlowFile reference
+     */
+    protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path fetchPath) {
+        return flowFile;
+    }
+}