You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/24 21:03:52 UTC

[6/7] nifi git commit: NIFI-3682: This closes #1682. Add Schema Access Strategy and Schema Write Strategy Record Readers and Writers; bug fixes.

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
new file mode 100644
index 0000000..5982908
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.queryrecord.FlowFileTable;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.StopWatch;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"sql", "query", "calcite", "route", "record", "transform", "select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", "text", "avro", "aggregate"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Evaluates one or more SQL queries against the contents of a FlowFile. The result of the "
+    + "SQL query then becomes the content of the output FlowFile. This can be used, for example, "
+    + "for field-specific filtering, transformation, and row-level filtering. "
+    + "Columns can be renamed, simple calculations and aggregations performed, etc. "
+    + "The Processor is configured with a Record Reader Controller Service and a Record Writer service so as to allow flexibility in incoming and outgoing data formats. "
+    + "The Processor must be configured with at least one user-defined property. The name of the Property "
+    + "is the Relationship to route data to, and the value of the Property is a SQL SELECT statement that is used to specify how input data should be transformed/filtered. "
+    + "The SQL statement must be valid ANSI SQL and is powered by Apache Calcite. "
+    + "If the transformation fails, the original FlowFile is routed to the 'failure' relationship. Otherwise, the data selected will be routed to the associated "
+    + "relationship. See the Processor Usage documentation for more information.")
+@DynamicRelationship(name="<Property Name>", description="Each user-defined property defines a new Relationship for this Processor.")
+@DynamicProperty(name = "The name of the relationship to route data to", value="A SQL SELECT statement that is used to determine what data should be routed to this "
+        + "relationship.", supportsExpressionLanguage=true, description="Each user-defined property specifies a SQL SELECT statement to run over the data, with the data "
+        + "that is selected being routed to the relationship whose name is the property name")
+public class QueryRecord extends AbstractProcessor {
+    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Controller Service to use for writing results to a FlowFile")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
+        .name("include-zero-record-flowfiles")
+        .displayName("Include Zero Record FlowFiles")
+        .description("When running the SQL statement against an incoming FlowFile, if the result has no data, "
+            + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+    static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder()
+        .name("cache-schema")
+        .displayName("Cache Schema")
+        .description("Parsing the SQL query and deriving the FlowFile's schema is relatively expensive. If this value is set to true, "
+            + "the Processor will cache these values so that the Processor is much more efficient and much faster. However, if this is done, "
+            + "then the schema that is derived for the first FlowFile processed must apply to all FlowFiles. If all FlowFiles will not have the exact "
+            + "same schema, or if the SQL SELECT statement uses the Expression Language, this value should be set to false.")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("The original FlowFile is routed to this relationship")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If a FlowFile fails processing for any reason (for example, the SQL "
+            + "statement contains columns not present in input data), the original FlowFile it will "
+            + "be routed to this relationship")
+        .build();
+
+    private List<PropertyDescriptor> properties;
+    private final Set<Relationship> relationships = Collections.synchronizedSet(new HashSet<>());
+
+    private final Map<String, BlockingQueue<CachedStatement>> statementQueues = new HashMap<>();
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        try {
+            DriverManager.registerDriver(new org.apache.calcite.jdbc.Driver());
+        } catch (final SQLException e) {
+            throw new ProcessException("Failed to load Calcite JDBC Driver", e);
+        }
+
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RECORD_READER_FACTORY);
+        properties.add(RECORD_WRITER_FACTORY);
+        properties.add(INCLUDE_ZERO_RECORD_FLOWFILES);
+        properties.add(CACHE_SCHEMA);
+        this.properties = Collections.unmodifiableList(properties);
+
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_ORIGINAL);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        if (!descriptor.isDynamic()) {
+            return;
+        }
+
+        final Relationship relationship = new Relationship.Builder()
+            .name(descriptor.getName())
+            .description("User-defined relationship that specifies where data that matches the specified SQL query should be routed")
+            .build();
+
+        if (newValue == null) {
+            relationships.remove(relationship);
+        } else {
+            relationships.add(relationship);
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final boolean cache = validationContext.getProperty(CACHE_SCHEMA).asBoolean();
+        if (cache) {
+            for (final PropertyDescriptor descriptor : validationContext.getProperties().keySet()) {
+                if (descriptor.isDynamic() && validationContext.isExpressionLanguagePresent(validationContext.getProperty(descriptor).getValue())) {
+                    return Collections.singleton(new ValidationResult.Builder()
+                        .subject("Cache Schema")
+                        .input("true")
+                        .valid(false)
+                        .explanation("Cannot have 'Cache Schema' property set to true if any SQL statement makes use of the Expression Language")
+                        .build());
+                }
+            }
+        }
+
+        return Collections.emptyList();
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .description("SQL select statement specifies how data should be filtered/transformed. "
+                + "SQL SELECT should select from the FLOWFILE table")
+            .required(false)
+            .dynamic(true)
+            .expressionLanguageSupported(true)
+            .addValidator(new SqlValidator())
+            .build();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final StopWatch stopWatch = new StopWatch(true);
+
+        final RecordSetWriterFactory resultSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY)
+            .asControllerService(RecordSetWriterFactory.class);
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
+            .asControllerService(RecordReaderFactory.class);
+
+        final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>();
+        final Set<FlowFile> createdFlowFiles = new HashSet<>();
+
+        int recordsRead = 0;
+
+        try {
+            final RecordSetWriter resultSetWriter;
+            try (final InputStream rawIn = session.read(original);
+                final InputStream in = new BufferedInputStream(rawIn)) {
+                resultSetWriter = resultSetWriterFactory.createWriter(getLogger(), original, in);
+            }
+
+            for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+                if (!descriptor.isDynamic()) {
+                    continue;
+                }
+
+                final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build();
+
+                // We have to fork a child because we may need to read the input FlowFile more than once,
+                // and we cannot call session.read() on the original FlowFile while we are within a write
+                // callback for the original FlowFile.
+                FlowFile transformed = session.create(original);
+                boolean flowFileRemoved = false;
+
+                try {
+                    final String sql = context.getProperty(descriptor).evaluateAttributeExpressions(original).getValue();
+                    final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
+                    final QueryResult queryResult;
+                    if (context.getProperty(CACHE_SCHEMA).asBoolean()) {
+                        queryResult = queryWithCache(session, original, sql, context, recordParserFactory);
+                    } else {
+                        queryResult = query(session, original, sql, context, recordParserFactory);
+                    }
+
+                    try {
+                        final ResultSet rs = queryResult.getResultSet();
+                        transformed = session.write(transformed, new OutputStreamCallback() {
+                            @Override
+                            public void process(final OutputStream out) throws IOException {
+                                try {
+                                    final ResultSetRecordSet recordSet = new ResultSetRecordSet(rs);
+                                    writeResultRef.set(resultSetWriter.write(recordSet, out));
+                                } catch (final Exception e) {
+                                    throw new IOException(e);
+                                }
+                            }
+                        });
+                    } finally {
+                        closeQuietly(queryResult);
+                    }
+
+                    recordsRead = Math.max(recordsRead, queryResult.getRecordsRead());
+                    final WriteResult result = writeResultRef.get();
+                    if (result.getRecordCount() == 0 && !context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean()) {
+                        session.remove(transformed);
+                        flowFileRemoved = true;
+                        transformedFlowFiles.remove(transformed);
+                        getLogger().info("Transformed {} but the result contained no data so will not pass on a FlowFile", new Object[] {original});
+                    } else {
+                        final Map<String, String> attributesToAdd = new HashMap<>();
+                        if (result.getAttributes() != null) {
+                            attributesToAdd.putAll(result.getAttributes());
+                        }
+
+                        attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), resultSetWriter.getMimeType());
+                        attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
+                        transformed = session.putAllAttributes(transformed, attributesToAdd);
+                        transformedFlowFiles.put(transformed, relationship);
+
+                        session.adjustCounter("Records Written", result.getRecordCount(), false);
+                    }
+                } finally {
+                    // Ensure that we have the FlowFile in the set in case we throw any Exception
+                    if (!flowFileRemoved) {
+                        createdFlowFiles.add(transformed);
+                    }
+                }
+            }
+
+            final long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+            if (transformedFlowFiles.size() > 0) {
+                session.getProvenanceReporter().fork(original, transformedFlowFiles.keySet(), elapsedMillis);
+
+                for (final Map.Entry<FlowFile, Relationship> entry : transformedFlowFiles.entrySet()) {
+                    final FlowFile transformed = entry.getKey();
+                    final Relationship relationship = entry.getValue();
+
+                    session.getProvenanceReporter().route(transformed, relationship);
+                    session.transfer(transformed, relationship);
+                }
+            }
+
+            getLogger().info("Successfully queried {} in {} millis", new Object[] {original, elapsedMillis});
+            session.transfer(original, REL_ORIGINAL);
+        } catch (final SQLException e) {
+            getLogger().error("Unable to query {} due to {}", new Object[] {original, e.getCause() == null ? e : e.getCause()});
+            session.remove(createdFlowFiles);
+            session.transfer(original, REL_FAILURE);
+        } catch (final Exception e) {
+            getLogger().error("Unable to query {} due to {}", new Object[] {original, e});
+            session.remove(createdFlowFiles);
+            session.transfer(original, REL_FAILURE);
+        }
+
+        session.adjustCounter("Records Read", recordsRead, false);
+    }
+
+    private synchronized CachedStatement getStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session,
+        final FlowFile flowFile, final RecordReaderFactory recordReaderFactory) throws SQLException {
+
+        final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql);
+        if (statementQueue == null) {
+            return buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory);
+        }
+
+        final CachedStatement cachedStmt = statementQueue.poll();
+        if (cachedStmt != null) {
+            return cachedStmt;
+        }
+
+        return buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory);
+    }
+
+    private CachedStatement buildCachedStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session,
+        final FlowFile flowFile, final RecordReaderFactory recordReaderFactory) throws SQLException {
+
+        final CalciteConnection connection = connectionSupplier.get();
+        final SchemaPlus rootSchema = connection.getRootSchema();
+
+        final FlowFileTable<?, ?> flowFileTable = new FlowFileTable<>(session, flowFile, recordReaderFactory, getLogger());
+        rootSchema.add("FLOWFILE", flowFileTable);
+        rootSchema.setCacheEnabled(false);
+
+        final PreparedStatement stmt = connection.prepareStatement(sql);
+        return new CachedStatement(stmt, flowFileTable, connection);
+    }
+
+    @OnStopped
+    public synchronized void cleanup() {
+        for (final BlockingQueue<CachedStatement> statementQueue : statementQueues.values()) {
+            CachedStatement stmt;
+            while ((stmt = statementQueue.poll()) != null) {
+                closeQuietly(stmt.getStatement(), stmt.getConnection());
+            }
+        }
+
+        statementQueues.clear();
+    }
+
+    @OnScheduled
+    public synchronized void setupQueues(final ProcessContext context) {
+        // Create a Queue of PreparedStatements for each property that is user-defined. This allows us to easily poll the
+        // queue and add as necessary, knowing that the queue already exists.
+        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+            if (!descriptor.isDynamic()) {
+                continue;
+            }
+
+            final String sql = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
+            final BlockingQueue<CachedStatement> queue = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+            statementQueues.put(sql, queue);
+        }
+    }
+
+    protected QueryResult queryWithCache(final ProcessSession session, final FlowFile flowFile, final String sql, final ProcessContext context,
+        final RecordReaderFactory recordParserFactory) throws SQLException {
+
+        final Supplier<CalciteConnection> connectionSupplier = () -> {
+            final Properties properties = new Properties();
+            properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
+
+            try {
+                final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
+                final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+                return calciteConnection;
+            } catch (final Exception e) {
+                throw new ProcessException(e);
+            }
+        };
+
+        final CachedStatement cachedStatement = getStatement(sql, connectionSupplier, session, flowFile, recordParserFactory);
+        final PreparedStatement stmt = cachedStatement.getStatement();
+        final FlowFileTable<?, ?> table = cachedStatement.getTable();
+        table.setFlowFile(session, flowFile);
+
+        final ResultSet rs = stmt.executeQuery();
+
+        return new QueryResult() {
+            @Override
+            public void close() throws IOException {
+                final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql);
+                if (statementQueue == null || !statementQueue.offer(cachedStatement)) {
+                    try {
+                        cachedStatement.getConnection().close();
+                    } catch (SQLException e) {
+                        throw new IOException("Failed to close statement", e);
+                    }
+                }
+            }
+
+            @Override
+            public ResultSet getResultSet() {
+                return rs;
+            }
+
+            @Override
+            public int getRecordsRead() {
+                return table.getRecordsRead();
+            }
+
+        };
+    }
+
+    protected QueryResult query(final ProcessSession session, final FlowFile flowFile, final String sql, final ProcessContext context,
+        final RecordReaderFactory recordParserFactory) throws SQLException {
+
+        final Properties properties = new Properties();
+        properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.JAVA.name());
+
+        Connection connection = null;
+        ResultSet resultSet = null;
+        Statement statement = null;
+        try {
+            connection = DriverManager.getConnection("jdbc:calcite:", properties);
+            final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+            final SchemaPlus rootSchema = calciteConnection.getRootSchema();
+
+            final FlowFileTable<?, ?> flowFileTable = new FlowFileTable<>(session, flowFile, recordParserFactory, getLogger());
+            rootSchema.add("FLOWFILE", flowFileTable);
+            rootSchema.setCacheEnabled(false);
+
+            statement = connection.createStatement();
+            resultSet = statement.executeQuery(sql);
+
+            final ResultSet rs = resultSet;
+            final Statement stmt = statement;
+            final Connection conn = connection;
+            return new QueryResult() {
+                @Override
+                public void close() throws IOException {
+                    closeQuietly(rs, stmt, conn);
+                }
+
+                @Override
+                public ResultSet getResultSet() {
+                    return rs;
+                }
+
+                @Override
+                public int getRecordsRead() {
+                    return flowFileTable.getRecordsRead();
+                }
+            };
+        } catch (final Exception e) {
+            closeQuietly(resultSet, statement, connection);
+            throw e;
+        }
+    }
+
+    private void closeQuietly(final AutoCloseable... closeables) {
+        if (closeables == null) {
+            return;
+        }
+
+        for (final AutoCloseable closeable : closeables) {
+            if (closeable == null) {
+                continue;
+            }
+
+            try {
+                closeable.close();
+            } catch (final Exception e) {
+                getLogger().warn("Failed to close SQL resource", e);
+            }
+        }
+    }
+
+    private static class SqlValidator implements Validator {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            if (context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder()
+                    .input(input)
+                    .subject(subject)
+                    .valid(true)
+                    .explanation("Expression Language Present")
+                    .build();
+            }
+
+            final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+            final SqlParser parser = SqlParser.create(substituted);
+            try {
+                parser.parseStmt();
+                return new ValidationResult.Builder()
+                    .subject(subject)
+                    .input(input)
+                    .valid(true)
+                    .build();
+            } catch (final Exception e) {
+                return new ValidationResult.Builder()
+                    .subject(subject)
+                    .input(input)
+                    .valid(false)
+                    .explanation("Not a valid SQL Statement: " + e.getMessage())
+                    .build();
+            }
+        }
+    }
+
+    private static interface QueryResult extends Closeable {
+        ResultSet getResultSet();
+
+        int getRecordsRead();
+    }
+
+    private static class CachedStatement {
+        private final FlowFileTable<?, ?> table;
+        private final PreparedStatement statement;
+        private final Connection connection;
+
+        public CachedStatement(final PreparedStatement statement, final FlowFileTable<?, ?> table, final Connection connection) {
+            this.statement = statement;
+            this.table = table;
+            this.connection = connection;
+        }
+
+        public FlowFileTable<?, ?> getTable() {
+            return table;
+        }
+
+        public PreparedStatement getStatement() {
+            return statement;
+        }
+
+        public Connection getConnection() {
+            return connection;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
deleted file mode 100644
index 7daa002..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.queryflowfile;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RowRecordReaderFactory;
-import org.apache.nifi.serialization.record.Record;
-
-public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
-    private final ProcessSession session;
-    private final FlowFile flowFile;
-    private final ComponentLog logger;
-    private final RowRecordReaderFactory recordParserFactory;
-    private final int[] fields;
-
-    private InputStream rawIn;
-    private Object currentRow;
-    private RecordReader recordParser;
-
-    public FlowFileEnumerator(final ProcessSession session, final FlowFile flowFile, final ComponentLog logger, final RowRecordReaderFactory parserFactory, final int[] fields) {
-        this.session = session;
-        this.flowFile = flowFile;
-        this.recordParserFactory = parserFactory;
-        this.logger = logger;
-        this.fields = fields;
-        reset();
-    }
-
-    @Override
-    public Object current() {
-        return currentRow;
-    }
-
-    @Override
-    public boolean moveNext() {
-        currentRow = null;
-        while (currentRow == null) {
-            try {
-                currentRow = filterColumns(recordParser.nextRecord());
-                break;
-            } catch (final IOException e) {
-                logger.error("Failed to read next record in stream for " + flowFile + ". Assuming end of stream.", e);
-                currentRow = null;
-                break;
-            } catch (final MalformedRecordException mre) {
-                logger.error("Failed to parse record in stream for " + flowFile + ". Will skip record and continue reading", mre);
-            }
-        }
-
-        if (currentRow == null) {
-            // If we are out of data, close the InputStream. We do this because
-            // Calcite does not necessarily call our close() method.
-            close();
-        }
-        return (currentRow != null);
-    }
-
-    private Object filterColumns(final Record record) {
-        if (record == null) {
-            return null;
-        }
-
-        final Object[] row = record.getValues();
-
-        // If we want no fields or if the row is null, just return null
-        if (fields == null || row == null) {
-            return row;
-        }
-
-        // If we want only a single field, then Calcite is going to expect us to return
-        // the actual value, NOT a 1-element array of values.
-        if (fields.length == 1) {
-            final int desiredCellIndex = fields[0];
-            return row[desiredCellIndex];
-        }
-
-        // Create a new Object array that contains only the desired fields.
-        final Object[] filtered = new Object[fields.length];
-        for (int i = 0; i < fields.length; i++) {
-            final int indexToKeep = fields[i];
-            filtered[i] = row[indexToKeep];
-        }
-
-        return filtered;
-    }
-
-    @Override
-    public void reset() {
-        if (rawIn != null) {
-            try {
-                rawIn.close();
-            } catch (final Exception e) {
-                logger.warn("Could not close FlowFile's input due to " + e, e);
-            }
-        }
-
-        rawIn = session.read(flowFile);
-
-        try {
-            recordParser = recordParserFactory.createRecordReader(flowFile, rawIn, logger);
-        } catch (final MalformedRecordException | IOException e) {
-            throw new ProcessException("Failed to reset stream", e);
-        }
-    }
-
-    @Override
-    public void close() {
-        if (recordParser != null) {
-            try {
-                recordParser.close();
-            } catch (final Exception e) {
-                logger.warn("Failed to close decorated source for " + flowFile, e);
-            }
-        }
-
-        try {
-            rawIn.close();
-        } catch (final Exception e) {
-            logger.warn("Failed to close InputStream for " + flowFile, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java
deleted file mode 100644
index c5179c9..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.queryflowfile;
-
-import java.util.List;
-
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * Planner rule that projects from a {@link FlowFileTableScan} scan just the columns
- * needed to satisfy a projection. If the projection's expressions are trivial,
- * the projection is removed.
- */
-public class FlowFileProjectTableScanRule extends RelOptRule {
-    public static final FlowFileProjectTableScanRule INSTANCE = new FlowFileProjectTableScanRule();
-
-    private FlowFileProjectTableScanRule() {
-        super(
-            operand(LogicalProject.class,
-                operand(FlowFileTableScan.class, none())),
-            "FlowFileProjectTableScanRule");
-    }
-
-    @Override
-    public void onMatch(RelOptRuleCall call) {
-        final LogicalProject project = call.rel(0);
-        final FlowFileTableScan scan = call.rel(1);
-        final int[] fields = getProjectFields(project.getProjects());
-
-        if (fields == null) {
-            // Project contains expressions more complex than just field references.
-            return;
-        }
-
-        call.transformTo(
-            new FlowFileTableScan(
-                scan.getCluster(),
-                scan.getTable(),
-                scan.flowFileTable,
-                fields));
-    }
-
-    private int[] getProjectFields(List<RexNode> exps) {
-        final int[] fields = new int[exps.size()];
-
-        for (int i = 0; i < exps.size(); i++) {
-            final RexNode exp = exps.get(i);
-
-            if (exp instanceof RexInputRef) {
-                fields[i] = ((RexInputRef) exp).getIndex();
-            } else {
-                return null; // not a simple projection
-            }
-        }
-
-        return fields;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
deleted file mode 100644
index 27f0c42..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.queryflowfile;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.linq4j.AbstractEnumerable;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.QueryProvider;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.QueryableTable;
-import org.apache.calcite.schema.Schema.TableType;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Schemas;
-import org.apache.calcite.schema.TranslatableTable;
-import org.apache.calcite.schema.impl.AbstractTable;
-import org.apache.calcite.util.Pair;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RowRecordReaderFactory;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-
-public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable, TranslatableTable {
-
-    private final RowRecordReaderFactory recordParserFactory;
-    private final ComponentLog logger;
-
-    private RecordSchema recordSchema;
-    private RelDataType relDataType = null;
-
-    private volatile ProcessSession session;
-    private volatile FlowFile flowFile;
-
-    /**
-     * Creates a FlowFile table.
-     */
-    public FlowFileTable(final ProcessSession session, final FlowFile flowFile, final RowRecordReaderFactory recordParserFactory, final ComponentLog logger) {
-        this.session = session;
-        this.flowFile = flowFile;
-        this.recordParserFactory = recordParserFactory;
-        this.logger = logger;
-    }
-
-    public void setFlowFile(final ProcessSession session, final FlowFile flowFile) {
-        this.session = session;
-        this.flowFile = flowFile;
-    }
-
-
-    @Override
-    public String toString() {
-        return "FlowFileTable";
-    }
-
-    /**
-     * Returns an enumerable over a given projection of the fields.
-     *
-     * <p>
-     * Called from generated code.
-     */
-    public Enumerable<Object> project(final int[] fields) {
-        return new AbstractEnumerable<Object>() {
-            @Override
-            @SuppressWarnings({"unchecked", "rawtypes"})
-            public Enumerator<Object> enumerator() {
-                return new FlowFileEnumerator(session, flowFile, logger, recordParserFactory, fields);
-            }
-        };
-    }
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public Expression getExpression(final SchemaPlus schema, final String tableName, final Class clazz) {
-        return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
-    }
-
-    @Override
-    public Type getElementType() {
-        return Object[].class;
-    }
-
-    @Override
-    public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, final SchemaPlus schema, final String tableName) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable relOptTable) {
-        // Request all fields.
-        final int fieldCount = relOptTable.getRowType().getFieldCount();
-        final int[] fields = new int[fieldCount];
-        for (int i = 0; i < fieldCount; i++) {
-            fields[i] = i;
-        }
-
-        return new FlowFileTableScan(context.getCluster(), relOptTable, this, fields);
-    }
-
-    @Override
-    public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
-        if (relDataType != null) {
-            return relDataType;
-        }
-
-        RecordSchema schema;
-        try (final InputStream in = session.read(flowFile)) {
-            final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, logger);
-            schema = recordParser.getSchema();
-        } catch (final MalformedRecordException | IOException e) {
-            throw new ProcessException("Failed to determine schema of data records for " + flowFile, e);
-        }
-
-        final List<String> names = new ArrayList<>();
-        final List<RelDataType> types = new ArrayList<>();
-
-        final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
-        for (final RecordField field : schema.getFields()) {
-            names.add(field.getFieldName());
-            types.add(getRelDataType(field.getDataType(), javaTypeFactory));
-        }
-
-        logger.debug("Found Schema: {}", new Object[] {schema});
-
-        if (recordSchema == null) {
-            recordSchema = schema;
-        }
-
-        relDataType = typeFactory.createStructType(Pair.zip(names, types));
-        return relDataType;
-    }
-
-    private RelDataType getRelDataType(final DataType fieldType, final JavaTypeFactory typeFactory) {
-        switch (fieldType.getFieldType()) {
-            case BOOLEAN:
-                return typeFactory.createJavaType(boolean.class);
-            case BYTE:
-                return typeFactory.createJavaType(byte.class);
-            case CHAR:
-                return typeFactory.createJavaType(char.class);
-            case DATE:
-                return typeFactory.createJavaType(java.sql.Date.class);
-            case DOUBLE:
-                return typeFactory.createJavaType(double.class);
-            case FLOAT:
-                return typeFactory.createJavaType(float.class);
-            case INT:
-                return typeFactory.createJavaType(int.class);
-            case SHORT:
-                return typeFactory.createJavaType(short.class);
-            case TIME:
-                return typeFactory.createJavaType(java.sql.Time.class);
-            case TIMESTAMP:
-                return typeFactory.createJavaType(java.sql.Timestamp.class);
-            case LONG:
-                return typeFactory.createJavaType(long.class);
-            case STRING:
-                return typeFactory.createJavaType(String.class);
-            case ARRAY:
-                return typeFactory.createJavaType(Object[].class);
-            case RECORD:
-                return typeFactory.createJavaType(Object.class);
-        }
-
-        throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType);
-    }
-
-    @Override
-    public TableType getJdbcTableType() {
-        return TableType.TEMPORARY_TABLE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java
deleted file mode 100644
index ad3a1c3..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.queryflowfile;
-
-import java.util.List;
-
-import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.adapter.enumerable.EnumerableRel;
-import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
-import org.apache.calcite.adapter.enumerable.PhysType;
-import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
-import org.apache.calcite.linq4j.tree.Blocks;
-import org.apache.calcite.linq4j.tree.Expressions;
-import org.apache.calcite.linq4j.tree.Primitive;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-
-/**
- * Relational expression representing a scan of a FlowFile.
- *
- * <p>
- * Like any table scan, it serves as a leaf node of a query tree.
- * </p>
- */
-public class FlowFileTableScan extends TableScan implements EnumerableRel {
-    final FlowFileTable<?, ?> flowFileTable;
-    final int[] fields;
-
-    protected FlowFileTableScan(final RelOptCluster cluster, final RelOptTable table, final FlowFileTable<?, ?> flowFileTable, final int[] fields) {
-        super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table);
-
-        this.flowFileTable = flowFileTable;
-        this.fields = fields;
-    }
-
-    @Override
-    public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs) {
-        return new FlowFileTableScan(getCluster(), table, flowFileTable, fields);
-    }
-
-    @Override
-    public RelWriter explainTerms(final RelWriter pw) {
-        return super.explainTerms(pw).item("fields", Primitive.asList(fields));
-    }
-
-    @Override
-    public RelDataType deriveRowType() {
-        final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
-        final RelDataTypeFactory.FieldInfoBuilder builder = getCluster().getTypeFactory().builder();
-        for (int field : fields) {
-            builder.add(fieldList.get(field));
-        }
-        return builder.build();
-    }
-
-    @Override
-    public void register(RelOptPlanner planner) {
-        planner.addRule(FlowFileProjectTableScanRule.INSTANCE);
-    }
-
-    @Override
-    public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
-        PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
-
-        return implementor.result(physType, Blocks.toBlock(
-            Expressions.call(table.getExpression(FlowFileTable.class), "project", Expressions.constant(fields))));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
new file mode 100644
index 0000000..e7b2c26
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.queryrecord;
+
+import java.io.InputStream;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
+    private final ProcessSession session;
+    private final FlowFile flowFile;
+    private final ComponentLog logger;
+    private final RecordReaderFactory recordParserFactory;
+    private final int[] fields;
+
+    private InputStream rawIn;
+    private Object currentRow;
+    private RecordReader recordParser;
+    private int recordsRead = 0;
+
+    public FlowFileEnumerator(final ProcessSession session, final FlowFile flowFile, final ComponentLog logger, final RecordReaderFactory parserFactory, final int[] fields) {
+        this.session = session;
+        this.flowFile = flowFile;
+        this.recordParserFactory = parserFactory;
+        this.logger = logger;
+        this.fields = fields;
+        reset();
+    }
+
+    @Override
+    public Object current() {
+        return currentRow;
+    }
+
+    @Override
+    public boolean moveNext() {
+        currentRow = null;
+        while (currentRow == null) {
+            try {
+                currentRow = filterColumns(recordParser.nextRecord());
+                break;
+            } catch (final Exception e) {
+                throw new ProcessException("Failed to read next record in stream for " + flowFile, e);
+            }
+        }
+
+        if (currentRow == null) {
+            // If we are out of data, close the InputStream. We do this because
+            // Calcite does not necessarily call our close() method.
+            close();
+            try {
+                onFinish();
+            } catch (final Exception e) {
+                logger.error("Failed to perform tasks when enumerator was finished", e);
+            }
+
+            return false;
+        }
+
+        recordsRead++;
+        return true;
+    }
+
+    protected int getRecordsRead() {
+        return recordsRead;
+    }
+
+    protected void onFinish() {
+    }
+
+    private Object filterColumns(final Record record) {
+        if (record == null) {
+            return null;
+        }
+
+        final Object[] row = record.getValues();
+
+        // If we want no fields or if the row is null, just return null
+        if (fields == null || row == null) {
+            return row;
+        }
+
+        // If we want only a single field, then Calcite is going to expect us to return
+        // the actual value, NOT a 1-element array of values.
+        if (fields.length == 1) {
+            final int desiredCellIndex = fields[0];
+            return row[desiredCellIndex];
+        }
+
+        // Create a new Object array that contains only the desired fields.
+        final Object[] filtered = new Object[fields.length];
+        for (int i = 0; i < fields.length; i++) {
+            final int indexToKeep = fields[i];
+            filtered[i] = row[indexToKeep];
+        }
+
+        return filtered;
+    }
+
+    @Override
+    public void reset() {
+        if (rawIn != null) {
+            try {
+                rawIn.close();
+            } catch (final Exception e) {
+                logger.warn("Could not close FlowFile's input due to " + e, e);
+            }
+        }
+
+        rawIn = session.read(flowFile);
+
+        try {
+            recordParser = recordParserFactory.createRecordReader(flowFile, rawIn, logger);
+        } catch (final Exception e) {
+            throw new ProcessException("Failed to reset stream", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (recordParser != null) {
+            try {
+                recordParser.close();
+            } catch (final Exception e) {
+                logger.warn("Failed to close decorated source for " + flowFile, e);
+            }
+        }
+
+        try {
+            rawIn.close();
+        } catch (final Exception e) {
+            logger.warn("Failed to close InputStream for " + flowFile, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileProjectTableScanRule.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileProjectTableScanRule.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileProjectTableScanRule.java
new file mode 100644
index 0000000..a44a5d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileProjectTableScanRule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.queryrecord;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * Planner rule that projects from a {@link FlowFileTableScan} scan just the columns
+ * needed to satisfy a projection. If the projection's expressions are trivial,
+ * the projection is removed.
+ */
+public class FlowFileProjectTableScanRule extends RelOptRule {
+    public static final FlowFileProjectTableScanRule INSTANCE = new FlowFileProjectTableScanRule();
+
+    private FlowFileProjectTableScanRule() {
+        super(
+            operand(LogicalProject.class,
+                operand(FlowFileTableScan.class, none())),
+            "FlowFileProjectTableScanRule");
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        final LogicalProject project = call.rel(0);
+        final FlowFileTableScan scan = call.rel(1);
+        final int[] fields = getProjectFields(project.getProjects());
+
+        if (fields == null) {
+            // Project contains expressions more complex than just field references.
+            return;
+        }
+
+        call.transformTo(
+            new FlowFileTableScan(
+                scan.getCluster(),
+                scan.getTable(),
+                scan.flowFileTable,
+                fields));
+    }
+
+    private int[] getProjectFields(List<RexNode> exps) {
+        final int[] fields = new int[exps.size()];
+
+        for (int i = 0; i < exps.size(); i++) {
+            final RexNode exp = exps.get(i);
+
+            if (exp instanceof RexInputRef) {
+                fields[i] = ((RexInputRef) exp).getIndex();
+            } else {
+                return null; // not a simple projection
+            }
+        }
+
+        return fields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
new file mode 100644
index 0000000..bd15dc2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.queryrecord;
+
+import java.io.InputStream;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.Pair;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable, TranslatableTable {
+
+    private final RecordReaderFactory recordParserFactory;
+    private final ComponentLog logger;
+
+    private RecordSchema recordSchema;
+    private RelDataType relDataType = null;
+
+    private volatile ProcessSession session;
+    private volatile FlowFile flowFile;
+    private volatile int maxRecordsRead;
+
+    /**
+     * Creates a FlowFile table.
+     */
+    public FlowFileTable(final ProcessSession session, final FlowFile flowFile, final RecordReaderFactory recordParserFactory, final ComponentLog logger) {
+        this.session = session;
+        this.flowFile = flowFile;
+        this.recordParserFactory = recordParserFactory;
+        this.logger = logger;
+    }
+
+    public void setFlowFile(final ProcessSession session, final FlowFile flowFile) {
+        this.session = session;
+        this.flowFile = flowFile;
+        this.maxRecordsRead = 0;
+    }
+
+
+    @Override
+    public String toString() {
+        return "FlowFileTable";
+    }
+
+    /**
+     * Returns an enumerable over a given projection of the fields.
+     *
+     * <p>
+     * Called from generated code.
+     */
+    public Enumerable<Object> project(final int[] fields) {
+        return new AbstractEnumerable<Object>() {
+            @Override
+            @SuppressWarnings({"unchecked", "rawtypes"})
+            public Enumerator<Object> enumerator() {
+                return new FlowFileEnumerator(session, flowFile, logger, recordParserFactory, fields) {
+                    @Override
+                    protected void onFinish() {
+                        final int recordCount = getRecordsRead();
+                        if (recordCount > maxRecordsRead) {
+                            maxRecordsRead = recordCount;
+                        }
+                    }
+                };
+            }
+        };
+    }
+
+    public int getRecordsRead() {
+        return maxRecordsRead;
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public Expression getExpression(final SchemaPlus schema, final String tableName, final Class clazz) {
+        return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
+    }
+
+    @Override
+    public Type getElementType() {
+        return Object[].class;
+    }
+
+    @Override
+    public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, final SchemaPlus schema, final String tableName) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable relOptTable) {
+        // Request all fields.
+        final int fieldCount = relOptTable.getRowType().getFieldCount();
+        final int[] fields = new int[fieldCount];
+        for (int i = 0; i < fieldCount; i++) {
+            fields[i] = i;
+        }
+
+        return new FlowFileTableScan(context.getCluster(), relOptTable, this, fields);
+    }
+
+    @Override
+    public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
+        if (relDataType != null) {
+            return relDataType;
+        }
+
+        RecordSchema schema;
+        try (final InputStream in = session.read(flowFile)) {
+            final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, logger);
+            schema = recordParser.getSchema();
+        } catch (final Exception e) {
+            throw new ProcessException("Failed to determine schema of data records for " + flowFile, e);
+        }
+
+        final List<String> names = new ArrayList<>();
+        final List<RelDataType> types = new ArrayList<>();
+
+        final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
+        for (final RecordField field : schema.getFields()) {
+            names.add(field.getFieldName());
+            types.add(getRelDataType(field.getDataType(), javaTypeFactory));
+        }
+
+        logger.debug("Found Schema: {}", new Object[] {schema});
+
+        if (recordSchema == null) {
+            recordSchema = schema;
+        }
+
+        relDataType = typeFactory.createStructType(Pair.zip(names, types));
+        return relDataType;
+    }
+
+    private RelDataType getRelDataType(final DataType fieldType, final JavaTypeFactory typeFactory) {
+        switch (fieldType.getFieldType()) {
+            case BOOLEAN:
+                return typeFactory.createJavaType(boolean.class);
+            case BYTE:
+                return typeFactory.createJavaType(byte.class);
+            case CHAR:
+                return typeFactory.createJavaType(char.class);
+            case DATE:
+                return typeFactory.createJavaType(java.sql.Date.class);
+            case DOUBLE:
+                return typeFactory.createJavaType(double.class);
+            case FLOAT:
+                return typeFactory.createJavaType(float.class);
+            case INT:
+                return typeFactory.createJavaType(int.class);
+            case SHORT:
+                return typeFactory.createJavaType(short.class);
+            case TIME:
+                return typeFactory.createJavaType(java.sql.Time.class);
+            case TIMESTAMP:
+                return typeFactory.createJavaType(java.sql.Timestamp.class);
+            case LONG:
+                return typeFactory.createJavaType(long.class);
+            case STRING:
+                return typeFactory.createJavaType(String.class);
+            case ARRAY:
+                return typeFactory.createJavaType(Object[].class);
+            case RECORD:
+                return typeFactory.createJavaType(Object.class);
+            case MAP:
+                return typeFactory.createJavaType(HashMap.class);
+        }
+
+        throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType);
+    }
+
+    @Override
+    public TableType getJdbcTableType() {
+        return TableType.TEMPORARY_TABLE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTableScan.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTableScan.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTableScan.java
new file mode 100644
index 0000000..afca202
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTableScan.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.queryrecord;
+
+import java.util.List;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.Blocks;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+/**
+ * Relational expression representing a scan of a FlowFile.
+ *
+ * <p>
+ * Like any table scan, it serves as a leaf node of a query tree.
+ * </p>
+ */
+public class FlowFileTableScan extends TableScan implements EnumerableRel {
+    final FlowFileTable<?, ?> flowFileTable;
+    final int[] fields;
+
+    protected FlowFileTableScan(final RelOptCluster cluster, final RelOptTable table, final FlowFileTable<?, ?> flowFileTable, final int[] fields) {
+        super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table);
+
+        this.flowFileTable = flowFileTable;
+        this.fields = fields;
+    }
+
+    @Override
+    public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs) {
+        return new FlowFileTableScan(getCluster(), table, flowFileTable, fields);
+    }
+
+    @Override
+    public RelWriter explainTerms(final RelWriter pw) {
+        return super.explainTerms(pw).item("fields", Primitive.asList(fields));
+    }
+
+    @Override
+    public RelDataType deriveRowType() {
+        final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
+        final RelDataTypeFactory.FieldInfoBuilder builder = getCluster().getTypeFactory().builder();
+        for (int field : fields) {
+            builder.add(fieldList.get(field));
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void register(RelOptPlanner planner) {
+        planner.addRule(FlowFileProjectTableScanRule.INSTANCE);
+    }
+
+    @Override
+    public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+        PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
+
+        return implementor.result(physType, Blocks.toBlock(
+            Expressions.call(table.getExpression(FlowFileTable.class), "project", Expressions.constant(fields))));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 3891ee6..d85e663 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -76,7 +76,7 @@ org.apache.nifi.processors.standard.PutSyslog
 org.apache.nifi.processors.standard.PutTCP
 org.apache.nifi.processors.standard.PutUDP
 org.apache.nifi.processors.standard.QueryDatabaseTable
-org.apache.nifi.processors.standard.QueryFlowFile
+org.apache.nifi.processors.standard.QueryRecord
 org.apache.nifi.processors.standard.ReplaceText
 org.apache.nifi.processors.standard.RouteText
 org.apache.nifi.processors.standard.ReplaceTextWithMapping

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
deleted file mode 100644
index 0dffc0d..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
+++ /dev/null
@@ -1,48 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-    <!--
-      Licensed to the Apache Software Foundation (ASF) under one or more
-      contributor license agreements.  See the NOTICE file distributed with
-      this work for additional information regarding copyright ownership.
-      The ASF licenses this file to You under the Apache License, Version 2.0
-      (the "License"); you may not use this file except in compliance with
-      the License.  You may obtain a copy of the License at
-          http://www.apache.org/licenses/LICENSE-2.0
-      Unless required by applicable law or agreed to in writing, software
-      distributed under the License is distributed on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-      See the License for the specific language governing permissions and
-      limitations under the License.
-    -->
-    <head>
-        <meta charset="utf-8" />
-        <title>QueryFlowFile</title>
-
-        <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
-    </head>
-
-    <body>
-    	<p>
-    		QueryFlowFile provides users a tremendous amount of power by leveraging an extremely well-known
-    		syntax (SQL) to route, filter, transform, and query data as it traverses the system. In order to
-    		provide the Processor with the maximum amount of flexibility, it is configured with a Controller
-    		Service that is responsible for reading and parsing the incoming FlowFiles and a Controller Service
-    		that is responsible for writing the results out. By using this paradigm, users are not forced to
-    		convert their data from one format to another just to query it, and then transform the data back
-    		into the form that they want. Rather, the appropriate Controller Service can easily be configured
-    		and put to use for the appropriate data format. 
-    	</p>
-    	
-    	<p>
-    		Rather than providing a single "SQL SELECT Statement" type of Property, this Processor makes use
-    		of user-defined properties. Each user-defined property that is added to the Processor has a name
-    		that becomes a new Relationship for the Processor and a corresponding SQL query that will be evaluated
-    		against each FlowFile. This allows multiple SQL queries to be run against each FlowFile.
-    	</p>
-    	
-    	<p>
-			The SQL syntax that is supported by this Processor is ANSI SQL and is powered by Apache Calcite. Please
-			note that identifiers are quoted using double-quotes, and column names/labels are case-insensitive.
-    	</p>
-	</body>
-</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html
new file mode 100644
index 0000000..93bbe2a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html
@@ -0,0 +1,48 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>QueryRecord</title>
+
+        <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+    	<p>
+    		QueryRecord provides users a tremendous amount of power by leveraging an extremely well-known
+    		syntax (SQL) to route, filter, transform, and query data as it traverses the system. In order to
+    		provide the Processor with the maximum amount of flexibility, it is configured with a Controller
+    		Service that is responsible for reading and parsing the incoming FlowFiles and a Controller Service
+    		that is responsible for writing the results out. By using this paradigm, users are not forced to
+    		convert their data from one format to another just to query it, and then transform the data back
+    		into the form that they want. Rather, the appropriate Controller Service can easily be configured
+    		and put to use for the appropriate data format. 
+    	</p>
+    	
+    	<p>
+    		Rather than providing a single "SQL SELECT Statement" type of Property, this Processor makes use
+    		of user-defined properties. Each user-defined property that is added to the Processor has a name
+    		that becomes a new Relationship for the Processor and a corresponding SQL query that will be evaluated
+    		against each FlowFile. This allows multiple SQL queries to be run against each FlowFile.
+    	</p>
+    	
+    	<p>
+			The SQL syntax that is supported by this Processor is ANSI SQL and is powered by Apache Calcite. Please
+			note that identifiers are quoted using double-quotes, and column names/labels are case-insensitive.
+    	</p>
+	</body>
+</html>
\ No newline at end of file