You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/24 21:03:53 UTC

[7/7] nifi git commit: NIFI-3682: This closes #1682. Add Schema Access Strategy and Schema Write Strategy Record Readers and Writers; bug fixes.

NIFI-3682: This closes #1682. Add Schema Access Strategy and Schema Write Strategy Record Readers and Writers; bug fixes.

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/50ea1083
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/50ea1083
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/50ea1083

Branch: refs/heads/master
Commit: 50ea1083ec416c5d6e70c4e7bb5b2abde8f6266c
Parents: 57ccf97
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Apr 19 12:39:35 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Mon Apr 24 17:02:45 2017 -0400

----------------------------------------------------------------------
 .../apache/nifi/components/PropertyValue.java   |   9 +
 .../expression/language/EmptyPreparedQuery.java |   5 +
 .../language/InvalidPreparedQuery.java          |   5 +
 .../expression/language/PreparedQuery.java      |   2 +
 .../language/StandardPreparedQuery.java         |   5 +
 .../language/StandardPropertyValue.java         |   5 +
 .../apache/nifi/util/MockProcessSession.java    |  12 +-
 .../org/apache/nifi/util/MockPropertyValue.java |  13 +
 .../apache/nifi/util/MockSessionFactory.java    |   6 +-
 .../nifi/util/StandardProcessorTestRunner.java  |  13 +-
 .../java/org/apache/nifi/util/TestRunner.java   |  16 +
 .../scheduling/ConnectableProcessContext.java   |   5 +
 .../src/main/resources/conf/logback.xml         |   2 +
 .../services/AvroSchemaRegistry.java            |  86 +--
 .../services/TestAvroSchemaRegistry.java        |  19 +-
 .../nifi/processors/standard/ConvertRecord.java |  20 +-
 .../nifi/processors/standard/QueryFlowFile.java | 554 -----------------
 .../nifi/processors/standard/QueryRecord.java   | 587 +++++++++++++++++++
 .../nifi/queryflowfile/FlowFileEnumerator.java  | 146 -----
 .../FlowFileProjectTableScanRule.java           |  76 ---
 .../nifi/queryflowfile/FlowFileTable.java       | 203 -------
 .../nifi/queryflowfile/FlowFileTableScan.java   |  91 ---
 .../nifi/queryrecord/FlowFileEnumerator.java    | 157 +++++
 .../FlowFileProjectTableScanRule.java           |  76 +++
 .../apache/nifi/queryrecord/FlowFileTable.java  | 218 +++++++
 .../nifi/queryrecord/FlowFileTableScan.java     |  91 +++
 .../org.apache.nifi.processor.Processor         |   2 +-
 .../additionalDetails.html                      |  48 --
 .../additionalDetails.html                      |  48 ++
 .../processors/standard/TestQueryFlowFile.java  | 265 ---------
 .../processors/standard/TestQueryRecord.java    | 297 ++++++++++
 .../standard/util/record/MockRecordParser.java  |  12 +-
 .../standard/util/record/MockRecordWriter.java  |  10 +-
 .../apache/nifi/schema/access/SchemaField.java  |  37 ++
 .../schema/access/SchemaNotFoundException.java  |  32 +
 .../nifi/serialization/RecordReaderFactory.java |  37 ++
 .../serialization/RecordSetWriterFactory.java   |  37 +-
 .../serialization/RowRecordReaderFactory.java   |  38 --
 .../nifi/serialization/SimpleRecordSchema.java  |  73 ++-
 .../nifi/serialization/record/DataType.java     |  12 +-
 .../nifi/serialization/record/MapRecord.java    |  98 +++-
 .../nifi/serialization/record/Record.java       |   4 +
 .../nifi/serialization/record/RecordField.java  |  47 +-
 .../serialization/record/RecordFieldType.java   |  53 +-
 .../nifi/serialization/record/RecordSchema.java |  21 +
 .../record/ResultSetRecordSet.java              |  14 +
 .../serialization/record/SchemaIdentifier.java  |  96 +++
 .../record/type/ArrayDataType.java              |   2 +-
 .../record/type/ChoiceDataType.java             |   2 +-
 .../serialization/record/type/MapDataType.java  |  67 +++
 .../record/type/RecordDataType.java             |   5 +
 .../record/util/DataTypeUtils.java              | 170 ++++--
 .../serialization/TestSimpleRecordSchema.java   |  79 +++
 .../serialization/record/TestMapRecord.java     | 188 ++++++
 .../nifi-record-serialization-services/pom.xml  |   7 +-
 .../java/org/apache/nifi/avro/AvroReader.java   |  38 +-
 .../nifi/avro/AvroReaderWithEmbeddedSchema.java |  62 ++
 .../nifi/avro/AvroReaderWithExplicitSchema.java |  75 +++
 .../org/apache/nifi/avro/AvroRecordReader.java  |  56 +-
 .../apache/nifi/avro/AvroRecordSetWriter.java   |  81 ++-
 .../apache/nifi/avro/AvroSchemaValidator.java   |   9 +
 .../java/org/apache/nifi/avro/AvroTypeUtil.java |  38 +-
 .../org/apache/nifi/avro/WriteAvroResult.java   |  76 +--
 .../avro/WriteAvroResultWithExternalSchema.java |  75 +++
 .../nifi/avro/WriteAvroResultWithSchema.java    |  62 ++
 .../nifi/csv/CSVHeaderSchemaStrategy.java       |  67 +++
 .../java/org/apache/nifi/csv/CSVReader.java     |  49 +-
 .../org/apache/nifi/csv/CSVRecordReader.java    |  32 +-
 .../org/apache/nifi/csv/CSVRecordSetWriter.java |  14 +-
 .../main/java/org/apache/nifi/csv/CSVUtils.java |  27 +-
 .../org/apache/nifi/csv/WriteCSVResult.java     |  40 +-
 .../java/org/apache/nifi/grok/GrokReader.java   | 126 +++-
 .../org/apache/nifi/grok/GrokRecordReader.java  | 125 +---
 .../nifi/json/AbstractJsonRowRecordReader.java  |  57 --
 .../org/apache/nifi/json/JsonPathReader.java    |  11 +-
 .../nifi/json/JsonPathRowRecordReader.java      |  19 +-
 .../apache/nifi/json/JsonRecordSetWriter.java   |  10 +-
 .../org/apache/nifi/json/JsonTreeReader.java    |  11 +-
 .../nifi/json/JsonTreeRowRecordReader.java      |  81 ++-
 .../org/apache/nifi/json/WriteJsonResult.java   |  77 ++-
 .../schema/access/AvroSchemaTextStrategy.java   |  64 ++
 ...onworksAttributeSchemaReferenceStrategy.java | 116 ++++
 ...rtonworksAttributeSchemaReferenceWriter.java |  69 +++
 ...rtonworksEncodedSchemaReferenceStrategy.java |  77 +++
 ...HortonworksEncodedSchemaReferenceWriter.java |  78 +++
 .../schema/access/SchemaAccessStrategy.java     |  43 ++
 .../nifi/schema/access/SchemaAccessWriter.java  |  63 ++
 .../schema/access/SchemaNameAsAttribute.java    |  62 ++
 .../access/SchemaNamePropertyStrategy.java      |  69 +++
 .../schema/access/SchemaTextAsAttribute.java    |  60 ++
 .../DateTimeTextRecordSetWriter.java            |  11 +-
 .../SchemaRegistryRecordReader.java             | 110 ----
 .../SchemaRegistryRecordSetWriter.java          | 164 ++++++
 .../serialization/SchemaRegistryService.java    | 227 +++++++
 .../nifi/text/FreeFormTextRecordSetWriter.java  |   9 +-
 .../apache/nifi/text/FreeFormTextWriter.java    |  23 +-
 .../avro/TestAvroReaderWithEmbeddedSchema.java  | 290 +++++++++
 .../apache/nifi/avro/TestAvroRecordReader.java  | 296 ----------
 .../apache/nifi/avro/TestWriteAvroResult.java   |  40 +-
 .../avro/TestWriteAvroResultWithSchema.java     |  46 ++
 .../avro/TestWriteAvroResultWithoutSchema.java  |  56 ++
 .../nifi/csv/TestCSVHeaderSchemaStrategy.java   |  69 +++
 .../org/apache/nifi/csv/TestWriteCSVResult.java |   7 +-
 .../apache/nifi/grok/TestGrokRecordReader.java  |  10 +-
 .../nifi/json/TestJsonTreeRowRecordReader.java  |  23 +
 .../apache/nifi/json/TestWriteJsonResult.java   |  14 +-
 .../test/resources/json/output/dataTypes.json   |   6 +-
 .../single-bank-account-wrong-field-type.json   |  13 +
 .../schemaregistry/services/SchemaRegistry.java |  66 ++-
 109 files changed, 5284 insertions(+), 2438 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java
index efe76ee..05f262f 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java
@@ -324,4 +324,13 @@ public interface PropertyValue {
      * Exception to be thrown
      */
     PropertyValue evaluateAttributeExpressions(FlowFile flowFile, AttributeValueDecorator decorator) throws ProcessException;
+
+    /**
+     * <p>
+     * Indicates whether the value of the property uses Expression Language.
+     * </p>
+     *
+     * @return <code>true</code> if the property value makes use of the Expression Language, <code>false</code> otherwise.
+     */
+    boolean isExpressionLanguagePresent();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java
index 4037531..a435b08 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java
@@ -38,4 +38,9 @@ public class EmptyPreparedQuery implements PreparedQuery {
     public String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator, Map<String, String> stateVariables) throws ProcessException {
         return value;
     }
+
+    @Override
+    public boolean isExpressionLanguagePresent() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java
index 1033c71..ce0dec3 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java
@@ -47,4 +47,9 @@ public class InvalidPreparedQuery implements PreparedQuery {
     public String evaluateExpressions( Map<String, String> valueLookup, AttributeValueDecorator decorator, Map<String, String> stateVariables) throws ProcessException {
         throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation);
     }
+
+    @Override
+    public boolean isExpressionLanguagePresent() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java
index 53f7296..5552cac 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java
@@ -26,4 +26,6 @@ public interface PreparedQuery {
     String evaluateExpressions(Map<String, String> valueLookup, AttributeValueDecorator decorator) throws ProcessException;
 
     String evaluateExpressions(final Map<String, String> valueLookup, final AttributeValueDecorator decorator, final Map<String, String> stateVariables) throws ProcessException;
+
+    boolean isExpressionLanguagePresent();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
index 39cfb25..9f12c92 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
@@ -59,4 +59,9 @@ public class StandardPreparedQuery implements PreparedQuery {
             throws ProcessException {
         return evaluateExpressions(valMap, decorator, null);
     }
+
+    @Override
+    public boolean isExpressionLanguagePresent() {
+        return !trees.isEmpty();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java
index 94c1c50..ac370bd 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java
@@ -195,4 +195,9 @@ public class StandardPropertyValue implements PropertyValue {
     public boolean isSet() {
         return rawValue != null;
     }
+
+    @Override
+    public boolean isExpressionLanguagePresent() {
+        return preparedQuery.isExpressionLanguagePresent();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index faf6e42..f05b9b3 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -72,6 +72,7 @@ public class MockProcessSession implements ProcessSession {
     private final Map<String, Long> counterMap = new HashMap<>();
     private final Set<FlowFile> recursionSet = new HashSet<>();
     private final MockProvenanceReporter provenanceReporter;
+    private final boolean enforceReadStreamsClosed;
 
     // A List of InputStreams that have been created by calls to {@link #read(FlowFile)} and have not yet been closed.
     private final Map<FlowFile, InputStream> openInputStreams = new HashMap<>();
@@ -83,7 +84,12 @@ public class MockProcessSession implements ProcessSession {
     private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
 
     public MockProcessSession(final SharedSessionState sharedState, final Processor processor) {
+        this(sharedState, processor, true);
+    }
+
+    public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceReadStreamsClosed) {
         this.processor = processor;
+        this.enforceReadStreamsClosed = enforceReadStreamsClosed;
         this.sharedState = sharedState;
         this.processorQueue = sharedState.getFlowFileQueue();
         provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
@@ -218,8 +224,10 @@ public class MockProcessSession implements ProcessSession {
                 }
             }
 
-            throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via "
-                + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy);
+            if (enforceReadStreamsClosed) {
+                throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via "
+                    + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy);
+            }
         }
 
         committed = true;

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
index b6752a7..c55ad23 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
@@ -16,10 +16,13 @@
  */
 package org.apache.nifi.util;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.attribute.expression.language.Query;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.attribute.expression.language.Query.Range;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.controller.ControllerService;
@@ -219,4 +222,14 @@ public class MockPropertyValue implements PropertyValue {
     public String toString() {
         return getValue();
     }
+
+    @Override
+    public boolean isExpressionLanguagePresent() {
+        if (!expectExpressions) {
+            return false;
+        }
+
+        final List<Range> elRanges = Query.extractExpressionRanges(rawValue);
+        return (elRanges != null && !elRanges.isEmpty());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
index 49b8796..010cc97 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
@@ -29,15 +29,17 @@ public class MockSessionFactory implements ProcessSessionFactory {
     private final Processor processor;
     private final SharedSessionState sharedState;
     private final Set<MockProcessSession> createdSessions = new CopyOnWriteArraySet<>();
+    private final boolean enforceReadStreamsClosed;
 
-    MockSessionFactory(final SharedSessionState sharedState, final Processor processor) {
+    MockSessionFactory(final SharedSessionState sharedState, final Processor processor, final boolean enforceReadStreamsClosed) {
         this.sharedState = sharedState;
         this.processor = processor;
+        this.enforceReadStreamsClosed = enforceReadStreamsClosed;
     }
 
     @Override
     public ProcessSession createSession() {
-        final MockProcessSession session = new MockProcessSession(sharedState, processor);
+        final MockProcessSession session = new MockProcessSession(sharedState, processor, enforceReadStreamsClosed);
         createdSessions.add(session);
         return session;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 451eea9..6c35643 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -78,7 +78,6 @@ public class StandardProcessorTestRunner implements TestRunner {
     private final Processor processor;
     private final MockProcessContext context;
     private final MockFlowFileQueue flowFileQueue;
-    private final MockSessionFactory sessionFactory;
     private final SharedSessionState sharedState;
     private final AtomicLong idGenerator;
     private final boolean triggerSerially;
@@ -87,17 +86,19 @@ public class StandardProcessorTestRunner implements TestRunner {
     private final MockVariableRegistry variableRegistry;
 
     private int numThreads = 1;
+    private MockSessionFactory sessionFactory;
     private final AtomicInteger invocations = new AtomicInteger(0);
 
     private final Map<String, MockComponentLog> controllerServiceLoggers = new HashMap<>();
     private final MockComponentLog logger;
+    private boolean enforceReadStreamsClosed = true;
 
     StandardProcessorTestRunner(final Processor processor) {
         this.processor = processor;
         this.idGenerator = new AtomicLong(0L);
         this.sharedState = new SharedSessionState(processor, idGenerator);
         this.flowFileQueue = sharedState.getFlowFileQueue();
-        this.sessionFactory = new MockSessionFactory(sharedState, processor);
+        this.sessionFactory = new MockSessionFactory(sharedState, processor, enforceReadStreamsClosed);
         this.processorStateManager = new MockStateManager(processor);
         this.variableRegistry = new MockVariableRegistry();
         this.context = new MockProcessContext(processor, processorStateManager, variableRegistry);
@@ -118,6 +119,12 @@ public class StandardProcessorTestRunner implements TestRunner {
     }
 
     @Override
+    public void enforceReadStreamsClosed(final boolean enforce) {
+        enforceReadStreamsClosed = enforce;
+        this.sessionFactory = new MockSessionFactory(sharedState, processor, enforceReadStreamsClosed);
+    }
+
+    @Override
     public void setValidateExpressionUsage(final boolean validate) {
         context.setValidateExpressionUsage(validate);
     }
@@ -412,7 +419,7 @@ public class StandardProcessorTestRunner implements TestRunner {
 
     @Override
     public MockFlowFile enqueue(final InputStream data, final Map<String, String> attributes) {
-        final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor);
+        final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor, enforceReadStreamsClosed);
         MockFlowFile flowFile = session.create();
         flowFile = session.importFrom(data, flowFile);
         flowFile = session.putAllAttributes(flowFile, attributes);

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 1c014c3..9a1a10d 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -30,6 +30,7 @@ import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
@@ -949,4 +950,19 @@ public interface TestRunner {
      * @param predicate conditions
      */
     void assertAllConditionsMet(final Relationship relationship, Predicate<MockFlowFile> predicate);
+
+    /**
+     * By default, if {@link ProcessSession#read(FlowFile)} is called, the InputStream that is returned MUST be closed by
+     * the processor under test or calls to {@link ProcessSession#commit()} will throw an Exception. This method allows
+     * the developer to indicate explicitly that they do or do not want this functionality. The ProcessSession that is used
+     * in the framework when running NiFi does not enforce this, as the framework itself tracks the InputStreams that it returns
+     * and ensures that they are properly closed on session commit or rollback. However, it is considered a best practice for
+     * Processors to close the streams themselves whenever they are no longer needed. There may be cases, however, where this
+     * is not feasible or easy and this method provides developers the ability to indicate that by disabling enforcement so that
+     * the framework will handle this.
+     *
+     * @param enforce <code>true</code> if calls to session.commit() should fail if the read streams are not properly closed.
+     */
+    void enforceReadStreamsClosed(boolean enforce);
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index 7c4ce77..0d755b0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -166,6 +166,11 @@ public class ConnectableProcessContext implements ProcessContext {
                     throws ProcessException {
                 return null;
             }
+
+            @Override
+            public boolean isExpressionLanguagePresent() {
+                return false;
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
index f357c41..f2da200 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
@@ -96,6 +96,8 @@
     <logger name="org.apache.zookeeper.ZooKeeper" level="ERROR" />
     <logger name="org.apache.zookeeper.server.PrepRequestProcessor" level="ERROR" />
 
+    <logger name="org.apache.calcite.runtime.CalciteException" level="OFF" />
+
     <logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" level="OFF" />
     <logger name="org.apache.curator.ConnectionState" level="OFF" />
     

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
index 13b1d5d..8fcb016 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
@@ -16,10 +16,13 @@
  */
 package org.apache.nifi.schemaregistry.services;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.avro.LogicalType;
@@ -34,18 +37,21 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
 
 @Tags({"schema", "registry", "avro", "json", "csv"})
 @CapabilityDescription("Provides a service for registering and accessing schemas. You can register a schema "
     + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual "
     + "representation of the actual schema following the syntax and semantics of Avro's Schema format.")
 public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
-
+    private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
     private final Map<String, String> schemaNameToSchemaMap;
 
     private static final String LOGICAL_TYPE_DATE = "date";
@@ -54,39 +60,51 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
     private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
     private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
 
-
     public AvroSchemaRegistry() {
         this.schemaNameToSchemaMap = new HashMap<>();
     }
 
-    @OnEnabled
-    public void enable(ConfigurationContext configuratiponContext) throws InitializationException {
-        this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream()
-            .filter(propEntry -> propEntry.getKey().isDynamic())
-            .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
+    @Override
+    public String retrieveSchemaText(final String schemaName) throws SchemaNotFoundException {
+        final String schemaText = schemaNameToSchemaMap.get(schemaName);
+        if (schemaText == null) {
+            throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'");
+        }
+
+        return schemaText;
     }
 
     @Override
-    public String retrieveSchemaText(String schemaName) {
-        if (!this.schemaNameToSchemaMap.containsKey(schemaName)) {
-            throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + ".");
-        } else {
-            return this.schemaNameToSchemaMap.get(schemaName);
-        }
+    public RecordSchema retrieveSchema(final String schemaName) throws SchemaNotFoundException {
+        final String schemaText = retrieveSchemaText(schemaName);
+        final Schema schema = new Schema.Parser().parse(schemaText);
+        return createRecordSchema(schema, schemaText, schemaName);
     }
 
     @Override
-    public String retrieveSchemaText(String schemaName, Map<String, String> attributes) {
-        throw new UnsupportedOperationException("This version of schema registry does not "
-            + "support this operation, since schemas are only identofied by name.");
+    public RecordSchema retrieveSchema(long schemaId, int version) throws IOException, SchemaNotFoundException {
+        throw new SchemaNotFoundException("This Schema Registry does not support schema lookup by identifier and version - only by name.");
     }
 
     @Override
+    public String retrieveSchemaText(long schemaId, int version) throws IOException, SchemaNotFoundException {
+        throw new SchemaNotFoundException("This Schema Registry does not support schema lookup by identifier and version - only by name.");
+    }
+
     @OnDisabled
     public void close() throws Exception {
-        this.schemaNameToSchemaMap.clear();
+        schemaNameToSchemaMap.clear();
     }
 
+
+    @OnEnabled
+    public void enable(final ConfigurationContext configurationContext) throws InitializationException {
+        this.schemaNameToSchemaMap.putAll(configurationContext.getProperties().entrySet().stream()
+            .filter(propEntry -> propEntry.getKey().isDynamic())
+            .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
+    }
+
+
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
@@ -99,28 +117,24 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
     }
 
 
-    @Override
-    public RecordSchema retrieveSchema(String schemaName) {
-        final String schemaText = this.retrieveSchemaText(schemaName);
-        final Schema schema = new Schema.Parser().parse(schemaText);
-        return createRecordSchema(schema);
-    }
-
     /**
      * Converts an Avro Schema to a RecordSchema
      *
      * @param avroSchema the Avro Schema to convert
+     * @param text the textual representation of the schema
+     * @param schemaName the name of the schema
      * @return the Corresponding Record Schema
      */
-    private RecordSchema createRecordSchema(final Schema avroSchema) {
+    private RecordSchema createRecordSchema(final Schema avroSchema, final String text, final String schemaName) {
         final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
         for (final Field field : avroSchema.getFields()) {
             final String fieldName = field.name();
             final DataType dataType = determineDataType(field.schema());
-            recordFields.add(new RecordField(fieldName, dataType));
+
+            recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases()));
         }
 
-        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, text, "avro", SchemaIdentifier.ofName(schemaName));
         return recordSchema;
     }
 
@@ -175,15 +189,19 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
                     final String fieldName = field.name();
                     final Schema fieldSchema = field.schema();
                     final DataType fieldType = determineDataType(fieldSchema);
-                    recordFields.add(new RecordField(fieldName, fieldType));
+
+                    recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
                 }
 
-                final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+                final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), "avro", SchemaIdentifier.EMPTY);
                 return RecordFieldType.RECORD.getRecordDataType(recordSchema);
             }
             case NULL:
+                return RecordFieldType.STRING.getDataType();
             case MAP:
-                return RecordFieldType.RECORD.getDataType();
+                final Schema valueSchema = avroSchema.getValueType();
+                final DataType valueType = determineDataType(valueSchema);
+                return RecordFieldType.MAP.getMapDataType(valueType);
             case UNION: {
                 final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream()
                     .filter(s -> s.getType() != Type.NULL)
@@ -206,12 +224,8 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
         return null;
     }
 
-    /*
-     * For this implementation 'attributes' argument is ignored since the underlying storage mechanisms
-     * is based strictly on key/value pairs. In other implementation additional attributes may play a role (e.g., version id,)
-     */
     @Override
-    public RecordSchema retrieveSchema(String schemaName, Map<String, String> attributes) {
-        return this.retrieveSchema(schemaName);
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
index 929aab9..a63097a 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.schemaregistry.services;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -27,9 +26,11 @@ import java.util.Map;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestAvroSchemaRegistry {
@@ -55,17 +56,17 @@ public class TestAvroSchemaRegistry {
         properties.put(fooSchema, fooSchemaText);
         properties.put(barSchema, "");
         when(configContext.getProperties()).thenReturn(properties);
-        SchemaRegistry delegate = new AvroSchemaRegistry();
-        ((AvroSchemaRegistry) delegate).enable(configContext);
+        AvroSchemaRegistry delegate = new AvroSchemaRegistry();
+        delegate.enable(configContext);
 
         String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
         assertEquals(fooSchemaText, locatedSchemaText);
         try {
-            locatedSchemaText = delegate.retrieveSchemaText("barSchema");
-            fail();
-        } catch (Exception e) {
-            // ignore
+            delegate.retrieveSchemaText("barSchema");
+            Assert.fail("Expected a SchemaNotFoundException to be thrown but it was not");
+        } catch (final SchemaNotFoundException expected) {
         }
+
         delegate.close();
     }
 
@@ -91,8 +92,8 @@ public class TestAvroSchemaRegistry {
         properties.put(fooSchema, fooSchemaText);
         properties.put(barSchema, "");
         when(configContext.getProperties()).thenReturn(properties);
-        SchemaRegistry delegate = new AvroSchemaRegistry();
-        ((AvroSchemaRegistry) delegate).enable(configContext);
+        AvroSchemaRegistry delegate = new AvroSchemaRegistry();
+        delegate.enable(configContext);
 
         RecordSchema locatedSchema = delegate.retrieveSchema(schemaName);
         List<RecordField> recordFields = locatedSchema.getFields();

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
index 9a505a2..2b2caa4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.standard;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -46,11 +47,12 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.RowRecordReaderFactory;
 import org.apache.nifi.serialization.WriteResult;
 
 @EventDriven
@@ -74,7 +76,7 @@ public class ConvertRecord extends AbstractProcessor {
         .name("record-reader")
         .displayName("Record Reader")
         .description("Specifies the Controller Service to use for reading incoming data")
-        .identifiesControllerService(RowRecordReaderFactory.class)
+        .identifiesControllerService(RecordReaderFactory.class)
         .required(true)
         .build();
     static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
@@ -118,9 +120,17 @@ public class ConvertRecord extends AbstractProcessor {
             return;
         }
 
-        final RowRecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RowRecordReaderFactory.class);
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-        final RecordSetWriter writer = writerFactory.createWriter(getLogger());
+        final RecordSetWriter writer;
+        try (final InputStream rawIn = session.read(flowFile);
+            final InputStream in = new BufferedInputStream(rawIn)) {
+            writer = writerFactory.createWriter(getLogger(), flowFile, in);
+        } catch (final Exception e) {
+            getLogger().error("Failed to convert records for {}; will route to failure", new Object[] {flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
 
         final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
 
@@ -134,7 +144,7 @@ public class ConvertRecord extends AbstractProcessor {
                         final WriteResult writeResult = writer.write(reader.createRecordSet(), out);
                         writeResultRef.set(writeResult);
 
-                    } catch (final MalformedRecordException e) {
+                    } catch (final SchemaNotFoundException | MalformedRecordException e) {
                         throw new ProcessException("Could not parse incoming data", e);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
deleted file mode 100644
index 83a3d4b..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
+++ /dev/null
@@ -1,554 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
-
-import org.apache.calcite.config.CalciteConnectionProperty;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.DynamicRelationship;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.queryflowfile.FlowFileTable;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.RowRecordReaderFactory;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.ResultSetRecordSet;
-import org.apache.nifi.util.StopWatch;
-
-@EventDriven
-@SideEffectFree
-@SupportsBatching
-@Tags({"sql", "query", "calcite", "route", "record", "transform", "select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", "text", "avro", "aggregate"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Evaluates one or more SQL queries against the contents of a FlowFile. The result of the "
-    + "SQL query then becomes the content of the output FlowFile. This can be used, for example, "
-    + "for field-specific filtering, transformation, and row-level filtering. "
-    + "Columns can be renamed, simple calculations and aggregations performed, etc. "
-    + "The Processor is configured with a Record Reader Controller Service and a Record Writer service so as to allow flexibility in incoming and outgoing data formats. "
-    + "The Processor must be configured with at least one user-defined property. The name of the Property "
-    + "is the Relationship to route data to, and the value of the Property is a SQL SELECT statement that is used to specify how input data should be transformed/filtered. "
-    + "The SQL statement must be valid ANSI SQL and is powered by Apache Calcite. "
-    + "If the transformation fails, the original FlowFile is routed to the 'failure' relationship. Otherwise, the data selected will be routed to the associated "
-    + "relationship. See the Processor Usage documentation for more information.")
-@DynamicRelationship(name="<Property Name>", description="Each user-defined property defines a new Relationship for this Processor.")
-@DynamicProperty(name = "The name of the relationship to route data to", value="A SQL SELECT statement that is used to determine what data should be routed to this "
-        + "relationship.", supportsExpressionLanguage=true, description="Each user-defined property specifies a SQL SELECT statement to run over the data, with the data "
-        + "that is selected being routed to the relationship whose name is the property name")
-public class QueryFlowFile extends AbstractProcessor {
-    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
-        .name("record-reader")
-        .displayName("Record Reader")
-        .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
-        .identifiesControllerService(RowRecordReaderFactory.class)
-        .required(true)
-        .build();
-    static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
-        .name("record-writer")
-        .displayName("Record Writer")
-        .description("Specifies the Controller Service to use for writing results to a FlowFile")
-        .identifiesControllerService(RecordSetWriterFactory.class)
-        .required(true)
-        .build();
-    static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
-        .name("include-zero-record-flowfiles")
-        .displayName("Include Zero Record FlowFiles")
-        .description("When running the SQL statement against an incoming FlowFile, if the result has no data, "
-            + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
-        .expressionLanguageSupported(false)
-        .allowableValues("true", "false")
-        .defaultValue("true")
-        .required(true)
-        .build();
-    static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder()
-        .name("cache-schema")
-        .displayName("Cache Schema")
-        .description("Parsing the SQL query and deriving the FlowFile's schema is relatively expensive. If this value is set to true, "
-            + "the Processor will cache these values so that the Processor is much more efficient and much faster. However, if this is done, "
-            + "then the schema that is derived for the first FlowFile processed must apply to all FlowFiles. If all FlowFiles will not have the exact "
-            + "same schema, or if the SQL SELECT statement uses the Expression Language, this value should be set to false.")
-        .expressionLanguageSupported(false)
-        .allowableValues("true", "false")
-        .defaultValue("true")
-        .required(true)
-        .build();
-
-    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
-        .name("original")
-        .description("The original FlowFile is routed to this relationship")
-        .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-        .name("failure")
-        .description("If a FlowFile fails processing for any reason (for example, the SQL "
-            + "statement contains columns not present in input data), the original FlowFile it will "
-            + "be routed to this relationship")
-        .build();
-
-    private List<PropertyDescriptor> properties;
-    private final Set<Relationship> relationships = Collections.synchronizedSet(new HashSet<>());
-
-    private final Map<String, BlockingQueue<CachedStatement>> statementQueues = new HashMap<>();
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        try {
-            DriverManager.registerDriver(new org.apache.calcite.jdbc.Driver());
-        } catch (final SQLException e) {
-            throw new ProcessException("Failed to load Calcite JDBC Driver", e);
-        }
-
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(RECORD_READER_FACTORY);
-        properties.add(RECORD_WRITER_FACTORY);
-        properties.add(INCLUDE_ZERO_RECORD_FLOWFILES);
-        properties.add(CACHE_SCHEMA);
-        this.properties = Collections.unmodifiableList(properties);
-
-        relationships.add(REL_FAILURE);
-        relationships.add(REL_ORIGINAL);
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
-        if (!descriptor.isDynamic()) {
-            return;
-        }
-
-        final Relationship relationship = new Relationship.Builder()
-            .name(descriptor.getName())
-            .description("User-defined relationship that specifies where data that matches the specified SQL query should be routed")
-            .build();
-
-        if (newValue == null) {
-            relationships.remove(relationship);
-        } else {
-            relationships.add(relationship);
-        }
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final boolean cache = validationContext.getProperty(CACHE_SCHEMA).asBoolean();
-        if (cache) {
-            for (final PropertyDescriptor descriptor : validationContext.getProperties().keySet()) {
-                if (descriptor.isDynamic() && validationContext.isExpressionLanguagePresent(validationContext.getProperty(descriptor).getValue())) {
-                    return Collections.singleton(new ValidationResult.Builder()
-                        .subject("Cache Schema")
-                        .input("true")
-                        .valid(false)
-                        .explanation("Cannot have 'Cache Schema' property set to true if any SQL statement makes use of the Expression Language")
-                        .build());
-                }
-            }
-        }
-
-        return Collections.emptyList();
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-            .name(propertyDescriptorName)
-            .description("SQL select statement specifies how data should be filtered/transformed. "
-                + "SQL SELECT should select from the FLOWFILE table")
-            .required(false)
-            .dynamic(true)
-            .expressionLanguageSupported(true)
-            .addValidator(new SqlValidator())
-            .build();
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        final FlowFile original = session.get();
-        if (original == null) {
-            return;
-        }
-
-        final StopWatch stopWatch = new StopWatch(true);
-
-        final RecordSetWriterFactory resultSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY)
-            .asControllerService(RecordSetWriterFactory.class);
-        final RowRecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
-            .asControllerService(RowRecordReaderFactory.class);
-
-        final RecordSetWriter resultSetWriter = resultSetWriterFactory.createWriter(getLogger());
-        final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>();
-        final Set<FlowFile> createdFlowFiles = new HashSet<>();
-
-        try {
-            for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
-                if (!descriptor.isDynamic()) {
-                    continue;
-                }
-
-                final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build();
-
-                // We have to fork a child because we may need to read the input FlowFile more than once,
-                // and we cannot call session.read() on the original FlowFile while we are within a write
-                // callback for the original FlowFile.
-                FlowFile transformed = session.create(original);
-
-                // Ensure that we have the FlowFile in the map in case we throw any Exception
-                createdFlowFiles.add(transformed);
-
-                final String sql = context.getProperty(descriptor).evaluateAttributeExpressions(original).getValue();
-                final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
-                final QueryResult queryResult;
-                if (context.getProperty(CACHE_SCHEMA).asBoolean()) {
-                    queryResult = queryWithCache(session, original, sql, context, recordParserFactory);
-                } else {
-                    queryResult = query(session, original, sql, context, recordParserFactory);
-                }
-
-                try {
-                    final ResultSet rs = queryResult.getResultSet();
-                    transformed = session.write(transformed, new OutputStreamCallback() {
-                        @Override
-                        public void process(final OutputStream out) throws IOException {
-                            try {
-                                final ResultSetRecordSet recordSet = new ResultSetRecordSet(rs);
-                                writeResultRef.set(resultSetWriter.write(recordSet, out));
-                            } catch (final Exception e) {
-                                throw new IOException(e);
-                            }
-                        }
-                    });
-                } finally {
-                    closeQuietly(queryResult);
-                }
-
-                final WriteResult result = writeResultRef.get();
-                if (result.getRecordCount() == 0 && !context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean()) {
-                    session.remove(transformed);
-                    transformedFlowFiles.remove(transformed);
-                    getLogger().info("Transformed {} but the result contained no data so will not pass on a FlowFile", new Object[] {original});
-                } else {
-                    final Map<String, String> attributesToAdd = new HashMap<>();
-                    if (result.getAttributes() != null) {
-                        attributesToAdd.putAll(result.getAttributes());
-                    }
-
-                    attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), resultSetWriter.getMimeType());
-                    attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
-                    transformed = session.putAllAttributes(transformed, attributesToAdd);
-                    transformedFlowFiles.put(transformed, relationship);
-                }
-            }
-
-            final long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
-            if (transformedFlowFiles.size() > 0) {
-                session.getProvenanceReporter().fork(original, transformedFlowFiles.keySet(), elapsedMillis);
-
-                for (final Map.Entry<FlowFile, Relationship> entry : transformedFlowFiles.entrySet()) {
-                    final FlowFile transformed = entry.getKey();
-                    final Relationship relationship = entry.getValue();
-
-                    session.getProvenanceReporter().route(transformed, relationship);
-                    session.transfer(transformed, relationship);
-                }
-            }
-
-            getLogger().info("Successfully transformed {} in {} millis", new Object[] {original, elapsedMillis});
-            session.transfer(original, REL_ORIGINAL);
-        } catch (ProcessException e) {
-            getLogger().error("Unable to transform {} due to {}", new Object[] {original, e});
-            session.remove(createdFlowFiles);
-            session.transfer(original, REL_FAILURE);
-        } catch (final SQLException e) {
-            getLogger().error("Unable to transform {} due to {}", new Object[] {original, e.getCause() == null ? e : e.getCause()});
-            session.remove(createdFlowFiles);
-            session.transfer(original, REL_FAILURE);
-        }
-    }
-
-    private synchronized CachedStatement getStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session,
-        final FlowFile flowFile, final RowRecordReaderFactory recordReaderFactory) throws SQLException {
-
-        final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql);
-        if (statementQueue == null) {
-            return buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory);
-        }
-
-        final CachedStatement cachedStmt = statementQueue.poll();
-        if (cachedStmt != null) {
-            return cachedStmt;
-        }
-
-        return buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory);
-    }
-
-    private CachedStatement buildCachedStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session,
-        final FlowFile flowFile, final RowRecordReaderFactory recordReaderFactory) throws SQLException {
-
-        final CalciteConnection connection = connectionSupplier.get();
-        final SchemaPlus rootSchema = connection.getRootSchema();
-
-        final FlowFileTable<?, ?> flowFileTable = new FlowFileTable<>(session, flowFile, recordReaderFactory, getLogger());
-        rootSchema.add("FLOWFILE", flowFileTable);
-        rootSchema.setCacheEnabled(false);
-
-        final PreparedStatement stmt = connection.prepareStatement(sql);
-        return new CachedStatement(stmt, flowFileTable, connection);
-    }
-
-    @OnStopped
-    public synchronized void cleanup() {
-        for (final BlockingQueue<CachedStatement> statementQueue : statementQueues.values()) {
-            CachedStatement stmt;
-            while ((stmt = statementQueue.poll()) != null) {
-                closeQuietly(stmt.getStatement(), stmt.getConnection());
-            }
-        }
-
-        statementQueues.clear();
-    }
-
-    @OnScheduled
-    public synchronized void setupQueues(final ProcessContext context) {
-        // Create a Queue of PreparedStatements for each property that is user-defined. This allows us to easily poll the
-        // queue and add as necessary, knowing that the queue already exists.
-        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
-            if (!descriptor.isDynamic()) {
-                continue;
-            }
-
-            final String sql = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
-            final BlockingQueue<CachedStatement> queue = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
-            statementQueues.put(sql, queue);
-        }
-    }
-
-    protected QueryResult queryWithCache(final ProcessSession session, final FlowFile flowFile, final String sql, final ProcessContext context,
-        final RowRecordReaderFactory recordParserFactory) throws SQLException {
-
-        final Supplier<CalciteConnection> connectionSupplier = () -> {
-            final Properties properties = new Properties();
-            properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
-
-            try {
-                final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
-                final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-                return calciteConnection;
-            } catch (final Exception e) {
-                throw new ProcessException(e);
-            }
-        };
-
-        final CachedStatement cachedStatement = getStatement(sql, connectionSupplier, session, flowFile, recordParserFactory);
-        final PreparedStatement stmt = cachedStatement.getStatement();
-        final FlowFileTable<?, ?> table = cachedStatement.getTable();
-        table.setFlowFile(session, flowFile);
-
-        final ResultSet rs = stmt.executeQuery();
-
-        return new QueryResult() {
-            @Override
-            public void close() throws IOException {
-                final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql);
-                if (statementQueue == null || !statementQueue.offer(cachedStatement)) {
-                    try {
-                        cachedStatement.getConnection().close();
-                    } catch (SQLException e) {
-                        throw new IOException("Failed to close statement", e);
-                    }
-                }
-            }
-
-            @Override
-            public ResultSet getResultSet() {
-                return rs;
-            }
-        };
-    }
-
-    protected QueryResult query(final ProcessSession session, final FlowFile flowFile, final String sql, final ProcessContext context,
-        final RowRecordReaderFactory recordParserFactory) throws SQLException {
-
-        final Properties properties = new Properties();
-        properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.JAVA.name());
-
-        Connection connection = null;
-        ResultSet resultSet = null;
-        Statement statement = null;
-        try {
-            connection = DriverManager.getConnection("jdbc:calcite:", properties);
-            final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-            final SchemaPlus rootSchema = calciteConnection.getRootSchema();
-
-            final FlowFileTable<?, ?> flowFileTable = new FlowFileTable<>(session, flowFile, recordParserFactory, getLogger());
-            rootSchema.add("FLOWFILE", flowFileTable);
-            rootSchema.setCacheEnabled(false);
-
-            statement = connection.createStatement();
-            resultSet = statement.executeQuery(sql);
-
-            final ResultSet rs = resultSet;
-            final Statement stmt = statement;
-            final Connection conn = connection;
-            return new QueryResult() {
-                @Override
-                public void close() throws IOException {
-                    closeQuietly(rs, stmt, conn);
-                }
-
-                @Override
-                public ResultSet getResultSet() {
-                    return rs;
-                }
-            };
-        } catch (final Exception e) {
-            closeQuietly(resultSet, statement, connection);
-            throw e;
-        }
-    }
-
-    private void closeQuietly(final AutoCloseable... closeables) {
-        if (closeables == null) {
-            return;
-        }
-
-        for (final AutoCloseable closeable : closeables) {
-            if (closeable == null) {
-                continue;
-            }
-
-            try {
-                closeable.close();
-            } catch (final Exception e) {
-                getLogger().warn("Failed to close SQL resource", e);
-            }
-        }
-    }
-
-    private static class SqlValidator implements Validator {
-        @Override
-        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
-            if (context.isExpressionLanguagePresent(input)) {
-                return new ValidationResult.Builder()
-                    .input(input)
-                    .subject(subject)
-                    .valid(true)
-                    .explanation("Expression Language Present")
-                    .build();
-            }
-
-            final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
-            final SqlParser parser = SqlParser.create(substituted);
-            try {
-                parser.parseStmt();
-                return new ValidationResult.Builder()
-                    .subject(subject)
-                    .input(input)
-                    .valid(true)
-                    .build();
-            } catch (final Exception e) {
-                return new ValidationResult.Builder()
-                    .subject(subject)
-                    .input(input)
-                    .valid(false)
-                    .explanation("Not a valid SQL Statement: " + e.getMessage())
-                    .build();
-            }
-        }
-    }
-
-    private static interface QueryResult extends Closeable {
-        ResultSet getResultSet();
-    }
-
-    private static class CachedStatement {
-        private final FlowFileTable<?, ?> table;
-        private final PreparedStatement statement;
-        private final Connection connection;
-
-        public CachedStatement(final PreparedStatement statement, final FlowFileTable<?, ?> table, final Connection connection) {
-            this.statement = statement;
-            this.table = table;
-            this.connection = connection;
-        }
-
-        public FlowFileTable<?, ?> getTable() {
-            return table;
-        }
-
-        public PreparedStatement getStatement() {
-            return statement;
-        }
-
-        public Connection getConnection() {
-            return connection;
-        }
-    }
-}