You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/11 23:33:31 UTC
[08/19] nifi git commit: NIFI-1280: Refactoring to make more generic
so that other data types can be supported;
created InputStreams to content on-demand so that multiple passes can be made
over FlowFile content if required. Created new Controller Servic
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
new file mode 100644
index 0000000..833a5d6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.queryflowfile.FlowFileTable;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.StopWatch;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"sql", "query", "calcite", "route", "record", "transform", "select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", "text", "avro", "aggregate"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Evaluates one or more SQL queries against the contents of a FlowFile. The result of the "
+ + "SQL query then becomes the content of the output FlowFile. This can be used, for example, "
+ + "for field-specific filtering, transformation, and row-level filtering. "
+ + "Columns can be renamed, simple calculations and aggregations performed, etc. "
+ + "The Processor is configured with a Record Reader Controller Service and a Record Writer service so as to allow flexibility in incoming and outgoing data formats. "
+ + "The Processor must be configured with at least one user-defined property. The name of the Property "
+ + "is the Relationship to route data to, and the value of the Property is a SQL SELECT statement that is used to specify how input data should be transformed/filtered. "
+ + "The SQL statement must be valid ANSI SQL and is powered by Apache Calcite. "
+ + "If the transformation fails, the original FlowFile is routed to the 'failure' relationship. Otherwise, the data selected will be routed to the associated "
+ + "relationship. See the Processor Usage documentation for more information.")
+@DynamicRelationship(name="<Property Name>", description="Each user-defined property defines a new Relationship for this Processor.")
+@DynamicProperty(name = "The name of the relationship to route data to", value="A SQL SELECT statement that is used to determine what data should be routed to this "
+ + "relationship.", supportsExpressionLanguage=true, description="Each user-defined property specifies a SQL SELECT statement to run over the data, with the data "
+ + "that is selected being routed to the relationship whose name is the property name")
+public class QueryFlowFile extends AbstractProcessor {
+ static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+ .name("Record Reader")
+ .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+ .identifiesControllerService(RowRecordReaderFactory.class)
+ .required(true)
+ .build();
+ static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
+ .name("Record Writer")
+ .description("Specifies the Controller Service to use for writing results to a FlowFile")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+ static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
+ .name("Include Zero Record FlowFiles")
+ .description("When running the SQL statement against an incoming FlowFile, if the result has no data, "
+ + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
+ .expressionLanguageSupported(false)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+ static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder()
+ .name("Cache Schema")
+ .description("Parsing the SQL query and deriving the FlowFile's schema is relatively expensive. If this value is set to true, "
+ + "the Processor will cache these values so that the Processor is much more efficient and much faster. However, if this is done, "
+ + "then the schema that is derived for the first FlowFile processed must apply to all FlowFiles. If all FlowFiles will not have the exact "
+ + "same schema, or if the SQL SELECT statement uses the Expression Language, this value should be set to false.")
+ .expressionLanguageSupported(false)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original FlowFile is routed to this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile fails processing for any reason (for example, the SQL "
+ + "statement contains columns not present in input data), the original FlowFile it will "
+ + "be routed to this relationship")
+ .build();
+
+ private List<PropertyDescriptor> properties;
+ private final Set<Relationship> relationships = Collections.synchronizedSet(new HashSet<>());
+
+ private final Map<String, BlockingQueue<CachedStatement>> statementQueues = new HashMap<>();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ try {
+ DriverManager.registerDriver(new org.apache.calcite.jdbc.Driver());
+ } catch (final SQLException e) {
+ throw new ProcessException("Failed to load Calcite JDBC Driver", e);
+ }
+
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RECORD_READER_FACTORY);
+ properties.add(RECORD_WRITER_FACTORY);
+ properties.add(INCLUDE_ZERO_RECORD_FLOWFILES);
+ properties.add(CACHE_SCHEMA);
+ this.properties = Collections.unmodifiableList(properties);
+
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_ORIGINAL);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ if (!descriptor.isDynamic()) {
+ return;
+ }
+
+ final Relationship relationship = new Relationship.Builder()
+ .name(descriptor.getName())
+ .description("User-defined relationship that specifies where data that matches the specified SQL query should be routed")
+ .build();
+
+ if (newValue == null) {
+ relationships.remove(relationship);
+ } else {
+ relationships.add(relationship);
+ }
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final boolean cache = validationContext.getProperty(CACHE_SCHEMA).asBoolean();
+ if (cache) {
+ for (final PropertyDescriptor descriptor : validationContext.getProperties().keySet()) {
+ if (descriptor.isDynamic() && validationContext.isExpressionLanguagePresent(validationContext.getProperty(descriptor).getValue())) {
+ return Collections.singleton(new ValidationResult.Builder()
+ .subject("Cache Schema")
+ .input("true")
+ .valid(false)
+ .explanation("Cannot have 'Cache Schema' property set to true if any SQL statement makes use of the Expression Language")
+ .build());
+ }
+ }
+ }
+
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .description("SQL select statement specifies how data should be filtered/transformed. "
+ + "SQL SELECT should select from the FLOWFILE table")
+ .required(false)
+ .dynamic(true)
+ .expressionLanguageSupported(true)
+ .addValidator(new SqlValidator())
+ .build();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ final FlowFile original = session.get();
+ if (original == null) {
+ return;
+ }
+
+ final StopWatch stopWatch = new StopWatch(true);
+
+ final RecordSetWriterFactory resultSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY)
+ .asControllerService(RecordSetWriterFactory.class);
+ final RowRecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
+ .asControllerService(RowRecordReaderFactory.class);
+
+ final RecordSetWriter resultSetWriter = resultSetWriterFactory.createWriter(getLogger());
+ final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>();
+ final Set<FlowFile> createdFlowFiles = new HashSet<>();
+
+ try {
+ for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+ if (!descriptor.isDynamic()) {
+ continue;
+ }
+
+ final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build();
+
+ // We have to fork a child because we may need to read the input FlowFile more than once,
+ // and we cannot call session.read() on the original FlowFile while we are within a write
+ // callback for the original FlowFile.
+ FlowFile transformed = session.create(original);
+
+ // Ensure that we have the FlowFile in the map in case we throw any Exception
+ createdFlowFiles.add(transformed);
+
+ final String sql = context.getProperty(descriptor).evaluateAttributeExpressions(original).getValue();
+ final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
+ final QueryResult queryResult;
+ if (context.getProperty(CACHE_SCHEMA).asBoolean()) {
+ queryResult = queryWithCache(session, original, sql, context, recordParserFactory);
+ } else {
+ queryResult = query(session, original, sql, context, recordParserFactory);
+ }
+
+ try {
+ final ResultSet rs = queryResult.getResultSet();
+ transformed = session.write(transformed, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ try {
+ final ResultSetRecordSet recordSet = new ResultSetRecordSet(rs);
+ writeResultRef.set(resultSetWriter.write(recordSet, out));
+ } catch (final Exception e) {
+ throw new IOException(e);
+ }
+ }
+ });
+ } finally {
+ closeQuietly(queryResult);
+ }
+
+ final WriteResult result = writeResultRef.get();
+ if (result.getRecordCount() == 0 && !context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean()) {
+ session.remove(transformed);
+ transformedFlowFiles.remove(transformed);
+ getLogger().info("Transformed {} but the result contained no data so will not pass on a FlowFile", new Object[] {original});
+ } else {
+ final Map<String, String> attributesToAdd = new HashMap<>();
+ if (result.getAttributes() != null) {
+ attributesToAdd.putAll(result.getAttributes());
+ }
+
+ attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), resultSetWriter.getMimeType());
+ attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
+ transformed = session.putAllAttributes(transformed, attributesToAdd);
+ transformedFlowFiles.put(transformed, relationship);
+ }
+ }
+
+ final long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+ if (transformedFlowFiles.size() > 0) {
+ session.getProvenanceReporter().fork(original, transformedFlowFiles.keySet(), elapsedMillis);
+
+ for (final Map.Entry<FlowFile, Relationship> entry : transformedFlowFiles.entrySet()) {
+ final FlowFile transformed = entry.getKey();
+ final Relationship relationship = entry.getValue();
+
+ session.getProvenanceReporter().route(transformed, relationship);
+ session.transfer(transformed, relationship);
+ }
+ }
+
+ getLogger().info("Successfully transformed {} in {} millis", new Object[] {original, elapsedMillis});
+ session.transfer(original, REL_ORIGINAL);
+ } catch (ProcessException e) {
+ getLogger().error("Unable to transform {} due to {}", new Object[] {original, e});
+ session.remove(createdFlowFiles);
+ session.transfer(original, REL_FAILURE);
+ } catch (final SQLException e) {
+ getLogger().error("Unable to transform {} due to {}", new Object[] {original, e.getCause() == null ? e : e.getCause()});
+ session.remove(createdFlowFiles);
+ session.transfer(original, REL_FAILURE);
+ }
+ }
+
+ private synchronized CachedStatement getStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session,
+ final FlowFile flowFile, final RowRecordReaderFactory recordReaderFactory) throws SQLException {
+
+ final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql);
+ if (statementQueue == null) {
+ return buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory);
+ }
+
+ final CachedStatement cachedStmt = statementQueue.poll();
+ if (cachedStmt != null) {
+ return cachedStmt;
+ }
+
+ return buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory);
+ }
+
+ private CachedStatement buildCachedStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session,
+ final FlowFile flowFile, final RowRecordReaderFactory recordReaderFactory) throws SQLException {
+
+ final CalciteConnection connection = connectionSupplier.get();
+ final SchemaPlus rootSchema = connection.getRootSchema();
+
+ final FlowFileTable<?, ?> flowFileTable = new FlowFileTable<>(session, flowFile, recordReaderFactory, getLogger());
+ rootSchema.add("FLOWFILE", flowFileTable);
+ rootSchema.setCacheEnabled(false);
+
+ final PreparedStatement stmt = connection.prepareStatement(sql);
+ return new CachedStatement(stmt, flowFileTable, connection);
+ }
+
+ @OnStopped
+ public synchronized void cleanup() {
+ for (final BlockingQueue<CachedStatement> statementQueue : statementQueues.values()) {
+ CachedStatement stmt;
+ while ((stmt = statementQueue.poll()) != null) {
+ closeQuietly(stmt.getStatement(), stmt.getConnection());
+ }
+ }
+
+ statementQueues.clear();
+ }
+
+ @OnScheduled
+ public synchronized void setupQueues(final ProcessContext context) {
+ // Create a Queue of PreparedStatements for each property that is user-defined. This allows us to easily poll the
+ // queue and add as necessary, knowing that the queue already exists.
+ for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+ if (!descriptor.isDynamic()) {
+ continue;
+ }
+
+ final String sql = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
+ final BlockingQueue<CachedStatement> queue = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+ statementQueues.put(sql, queue);
+ }
+ }
+
+ protected QueryResult queryWithCache(final ProcessSession session, final FlowFile flowFile, final String sql, final ProcessContext context,
+ final RowRecordReaderFactory recordParserFactory) throws SQLException {
+
+ final Supplier<CalciteConnection> connectionSupplier = () -> {
+ final Properties properties = new Properties();
+ properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.JAVA.name());
+
+ try {
+ final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
+ final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+ return calciteConnection;
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
+ };
+
+ final CachedStatement cachedStatement = getStatement(sql, connectionSupplier, session, flowFile, recordParserFactory);
+ final PreparedStatement stmt = cachedStatement.getStatement();
+ final FlowFileTable<?, ?> table = cachedStatement.getTable();
+ table.setFlowFile(session, flowFile);
+
+ final ResultSet rs = stmt.executeQuery();
+
+ return new QueryResult() {
+ @Override
+ public void close() throws IOException {
+ final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql);
+ if (statementQueue == null || !statementQueue.offer(cachedStatement)) {
+ try {
+ cachedStatement.getConnection().close();
+ } catch (SQLException e) {
+ throw new IOException("Failed to close statement", e);
+ }
+ }
+ }
+
+ @Override
+ public ResultSet getResultSet() {
+ return rs;
+ }
+ };
+ }
+
+ protected QueryResult query(final ProcessSession session, final FlowFile flowFile, final String sql, final ProcessContext context,
+ final RowRecordReaderFactory recordParserFactory) throws SQLException {
+
+ final Properties properties = new Properties();
+ properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.JAVA.name());
+
+ Connection connection = null;
+ ResultSet resultSet = null;
+ Statement statement = null;
+ try {
+ connection = DriverManager.getConnection("jdbc:calcite:", properties);
+ final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+ final SchemaPlus rootSchema = calciteConnection.getRootSchema();
+
+ final FlowFileTable<?, ?> flowFileTable = new FlowFileTable<>(session, flowFile, recordParserFactory, getLogger());
+ rootSchema.add("FLOWFILE", flowFileTable);
+ rootSchema.setCacheEnabled(false);
+
+ statement = connection.createStatement();
+ resultSet = statement.executeQuery(sql);
+
+ final ResultSet rs = resultSet;
+ final Statement stmt = statement;
+ final Connection conn = connection;
+ return new QueryResult() {
+ @Override
+ public void close() throws IOException {
+ closeQuietly(rs, stmt, conn);
+ }
+
+ @Override
+ public ResultSet getResultSet() {
+ return rs;
+ }
+ };
+ } catch (final Exception e) {
+ closeQuietly(resultSet, statement, connection);
+ throw e;
+ }
+ }
+
+ private void closeQuietly(final AutoCloseable... closeables) {
+ if (closeables == null) {
+ return;
+ }
+
+ for (final AutoCloseable closeable : closeables) {
+ if (closeable == null) {
+ continue;
+ }
+
+ try {
+ closeable.close();
+ } catch (final Exception e) {
+ getLogger().warn("Failed to close SQL resource", e);
+ }
+ }
+ }
+
+ private static class SqlValidator implements Validator {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+ final SqlParser parser = SqlParser.create(substituted);
+ try {
+ parser.parseStmt();
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(true)
+ .build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Not a valid SQL Statement: " + e.getMessage())
+ .build();
+ }
+ }
+ }
+
+ private static interface QueryResult extends Closeable {
+ ResultSet getResultSet();
+ }
+
+ private static class CachedStatement {
+ private final FlowFileTable<?, ?> table;
+ private final PreparedStatement statement;
+ private final Connection connection;
+
+ public CachedStatement(final PreparedStatement statement, final FlowFileTable<?, ?> table, final Connection connection) {
+ this.statement = statement;
+ this.table = table;
+ this.connection = connection;
+ }
+
+ public FlowFileTable<?, ?> getTable() {
+ return table;
+ }
+
+ public PreparedStatement getStatement() {
+ return statement;
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
new file mode 100644
index 0000000..1a62d14
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.queryflowfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
+ private final ProcessSession session;
+ private final FlowFile flowFile;
+ private final ComponentLog logger;
+ private final RowRecordReaderFactory recordParserFactory;
+ private final int[] fields;
+
+ private InputStream rawIn;
+ private Object currentRow;
+ private RecordReader recordParser;
+
+ public FlowFileEnumerator(final ProcessSession session, final FlowFile flowFile, final ComponentLog logger, final RowRecordReaderFactory parserFactory, final int[] fields) {
+ this.session = session;
+ this.flowFile = flowFile;
+ this.recordParserFactory = parserFactory;
+ this.logger = logger;
+ this.fields = fields;
+ reset();
+ }
+
+ @Override
+ public Object current() {
+ return currentRow;
+ }
+
+ @Override
+ public boolean moveNext() {
+ currentRow = null;
+ while (currentRow == null) {
+ try {
+ currentRow = filterColumns(recordParser.nextRecord());
+ break;
+ } catch (final IOException e) {
+ logger.error("Failed to read next record in stream for " + flowFile + ". Assuming end of stream.", e);
+ currentRow = null;
+ break;
+ } catch (final MalformedRecordException mre) {
+ logger.error("Failed to parse record in stream for " + flowFile + ". Will skip record and continue reading", mre);
+ }
+ }
+
+ if (currentRow == null) {
+ // If we are out of data, close the InputStream. We do this because
+ // Calcite does not necessarily call our close() method.
+ close();
+ }
+ return (currentRow != null);
+ }
+
+ private Object filterColumns(final Record record) {
+ if (record == null) {
+ return null;
+ }
+
+ final Object[] row = record.getValues();
+
+ // If we want no fields or if the row is null, just return null
+ if (fields == null || row == null) {
+ return row;
+ }
+
+ // If we want only a single field, then Calcite is going to expect us to return
+ // the actual value, NOT a 1-element array of values.
+ if (fields.length == 1) {
+ final int desiredCellIndex = fields[0];
+ return row[desiredCellIndex];
+ }
+
+ // Create a new Object array that contains only the desired fields.
+ if (row.length <= fields.length) {
+ return row;
+ }
+
+ final Object[] filtered = new Object[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ final int indexToKeep = fields[i];
+ filtered[i] = row[indexToKeep];
+ }
+
+ return filtered;
+ }
+
+ @Override
+ public void reset() {
+ if (rawIn != null) {
+ try {
+ rawIn.close();
+ } catch (final Exception e) {
+ logger.warn("Could not close FlowFile's input due to " + e, e);
+ }
+ }
+
+ rawIn = session.read(flowFile);
+
+ try {
+ recordParser = recordParserFactory.createRecordReader(rawIn, logger);
+ } catch (final MalformedRecordException | IOException e) {
+ throw new ProcessException("Failed to reset stream", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (recordParser != null) {
+ try {
+ recordParser.close();
+ } catch (final Exception e) {
+ logger.warn("Failed to close decorated source for " + flowFile, e);
+ }
+ }
+
+ try {
+ rawIn.close();
+ } catch (final Exception e) {
+ logger.warn("Failed to close InputStream for " + flowFile, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java
new file mode 100644
index 0000000..c5179c9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.queryflowfile;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * Planner rule that projects from a {@link FlowFileTableScan} scan just the columns
+ * needed to satisfy a projection. If the projection's expressions are trivial,
+ * the projection is removed.
+ */
+public class FlowFileProjectTableScanRule extends RelOptRule {
+ public static final FlowFileProjectTableScanRule INSTANCE = new FlowFileProjectTableScanRule();
+
+ private FlowFileProjectTableScanRule() {
+ super(
+ operand(LogicalProject.class,
+ operand(FlowFileTableScan.class, none())),
+ "FlowFileProjectTableScanRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final LogicalProject project = call.rel(0);
+ final FlowFileTableScan scan = call.rel(1);
+ final int[] fields = getProjectFields(project.getProjects());
+
+ if (fields == null) {
+ // Project contains expressions more complex than just field references.
+ return;
+ }
+
+ call.transformTo(
+ new FlowFileTableScan(
+ scan.getCluster(),
+ scan.getTable(),
+ scan.flowFileTable,
+ fields));
+ }
+
+ private int[] getProjectFields(List<RexNode> exps) {
+ final int[] fields = new int[exps.size()];
+
+ for (int i = 0; i < exps.size(); i++) {
+ final RexNode exp = exps.get(i);
+
+ if (exp instanceof RexInputRef) {
+ fields[i] = ((RexInputRef) exp).getIndex();
+ } else {
+ return null; // not a simple projection
+ }
+ }
+
+ return fields;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
new file mode 100644
index 0000000..a23dcfa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.queryflowfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.Pair;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable, TranslatableTable {
+
+ private final RowRecordReaderFactory recordParserFactory;
+ private final ComponentLog logger;
+
+ private RecordSchema recordSchema;
+ private RelDataType relDataType = null;
+
+ private volatile ProcessSession session;
+ private volatile FlowFile flowFile;
+
+ /**
+ * Creates a FlowFile table.
+ */
+ public FlowFileTable(final ProcessSession session, final FlowFile flowFile, final RowRecordReaderFactory recordParserFactory, final ComponentLog logger) {
+ this.session = session;
+ this.flowFile = flowFile;
+ this.recordParserFactory = recordParserFactory;
+ this.logger = logger;
+ }
+
+ public void setFlowFile(final ProcessSession session, final FlowFile flowFile) {
+ this.session = session;
+ this.flowFile = flowFile;
+ }
+
+
+ @Override
+ public String toString() {
+ return "FlowFileTable";
+ }
+
+ /**
+ * Returns an enumerable over a given projection of the fields.
+ *
+ * <p>
+ * Called from generated code.
+ */
+ public Enumerable<Object> project(final int[] fields) {
+ return new AbstractEnumerable<Object>() {
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public Enumerator<Object> enumerator() {
+ return new FlowFileEnumerator(session, flowFile, logger, recordParserFactory, fields);
+ }
+ };
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Expression getExpression(final SchemaPlus schema, final String tableName, final Class clazz) {
+ return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
+ }
+
+ @Override
+ public Type getElementType() {
+ return Object[].class;
+ }
+
+ @Override
+ public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, final SchemaPlus schema, final String tableName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable relOptTable) {
+ // Request all fields.
+ final int fieldCount = relOptTable.getRowType().getFieldCount();
+ final int[] fields = new int[fieldCount];
+ for (int i = 0; i < fieldCount; i++) {
+ fields[i] = i;
+ }
+
+ return new FlowFileTableScan(context.getCluster(), relOptTable, this, fields);
+ }
+
+ @Override
+ public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
+ if (relDataType != null) {
+ return relDataType;
+ }
+
+ RecordSchema schema;
+ try (final InputStream in = session.read(flowFile)) {
+ final RecordReader recordParser = recordParserFactory.createRecordReader(in, logger);
+ schema = recordParser.getSchema();
+ } catch (final MalformedRecordException | IOException e) {
+ throw new ProcessException("Failed to determine schema of data records for " + flowFile, e);
+ }
+
+ final List<String> names = new ArrayList<>();
+ final List<RelDataType> types = new ArrayList<>();
+
+ final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
+ for (final RecordField field : schema.getFields()) {
+ names.add(field.getFieldName());
+ types.add(getRelDataType(field.getDataType(), javaTypeFactory));
+ }
+
+ logger.debug("Found Schema: {}", new Object[] {schema});
+
+ if (recordSchema == null) {
+ recordSchema = schema;
+ }
+
+ relDataType = typeFactory.createStructType(Pair.zip(names, types));
+ return relDataType;
+ }
+
+ private RelDataType getRelDataType(final DataType fieldType, final JavaTypeFactory typeFactory) {
+ switch (fieldType.getFieldType()) {
+ case BOOLEAN:
+ return typeFactory.createJavaType(boolean.class);
+ case BYTE:
+ return typeFactory.createJavaType(byte.class);
+ case CHAR:
+ return typeFactory.createJavaType(char.class);
+ case DATE:
+ return typeFactory.createJavaType(java.sql.Date.class);
+ case DOUBLE:
+ return typeFactory.createJavaType(double.class);
+ case FLOAT:
+ return typeFactory.createJavaType(float.class);
+ case INT:
+ return typeFactory.createJavaType(int.class);
+ case SHORT:
+ return typeFactory.createJavaType(short.class);
+ case TIME:
+ return typeFactory.createJavaType(java.sql.Time.class);
+ case TIMESTAMP:
+ return typeFactory.createJavaType(java.sql.Timestamp.class);
+ case LONG:
+ return typeFactory.createJavaType(long.class);
+ case STRING:
+ return typeFactory.createJavaType(String.class);
+ case ARRAY:
+ return typeFactory.createJavaType(Object[].class);
+ case OBJECT:
+ return typeFactory.createJavaType(Object.class);
+ }
+
+ throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType);
+ }
+
+ @Override
+ public TableType getJdbcTableType() {
+ return TableType.TEMPORARY_TABLE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java
new file mode 100644
index 0000000..ad3a1c3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.queryflowfile;
+
+import java.util.List;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.Blocks;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+/**
+ * Relational expression representing a scan of a FlowFile.
+ *
+ * <p>
+ * Like any table scan, it serves as a leaf node of a query tree.
+ * </p>
+ */
+public class FlowFileTableScan extends TableScan implements EnumerableRel {
+ final FlowFileTable<?, ?> flowFileTable;
+ final int[] fields;
+
+ protected FlowFileTableScan(final RelOptCluster cluster, final RelOptTable table, final FlowFileTable<?, ?> flowFileTable, final int[] fields) {
+ super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table);
+
+ this.flowFileTable = flowFileTable;
+ this.fields = fields;
+ }
+
+ @Override
+ public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs) {
+ return new FlowFileTableScan(getCluster(), table, flowFileTable, fields);
+ }
+
+ @Override
+ public RelWriter explainTerms(final RelWriter pw) {
+ return super.explainTerms(pw).item("fields", Primitive.asList(fields));
+ }
+
+ @Override
+ public RelDataType deriveRowType() {
+ final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
+ final RelDataTypeFactory.FieldInfoBuilder builder = getCluster().getTypeFactory().builder();
+ for (int field : fields) {
+ builder.add(fieldList.get(field));
+ }
+ return builder.build();
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+ planner.addRule(FlowFileProjectTableScanRule.INSTANCE);
+ }
+
+ @Override
+ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+ PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
+
+ return implementor.result(physType, Blocks.toBlock(
+ Expressions.call(table.getExpression(FlowFileTable.class), "project", Expressions.constant(fields))));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9de5ab6..2f2b0cb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -75,6 +75,7 @@ org.apache.nifi.processors.standard.PutSyslog
org.apache.nifi.processors.standard.PutTCP
org.apache.nifi.processors.standard.PutUDP
org.apache.nifi.processors.standard.QueryDatabaseTable
+org.apache.nifi.processors.standard.QueryFlowFile
org.apache.nifi.processors.standard.ReplaceText
org.apache.nifi.processors.standard.RouteText
org.apache.nifi.processors.standard.ReplaceTextWithMapping
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
new file mode 100644
index 0000000..1cc7923
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
@@ -0,0 +1,47 @@
+<!DOCTYPE html>
+<html lang="en">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <head>
+ <meta charset="utf-8" />
+ <title>QueryFlowFile</title>
+
+ <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+ </head>
+
+ <body>
+ <p>
+ QueryFlowFile provides users a tremendous amount of power by leveraging an extremely well-known
+ syntax (SQL) to route, filter, transform, and query data as it traverses the system. In order to
+ provide the Processor with the maximum amount of flexibility, it is configured with a Controller
+ Service that is responsible for reading and parsing the incoming FlowFiles and a Controller Service
+ that is responsible for writing the results out. By using this paradigm, users are not forced to
+ convert their data from one format to another just to query it, and then transform the data back
+ into the form that they want. Rather, the appropriate Controller Service can easily be configured
+ and put to use for the appropriate data format.
+ </p>
+
+ <p>
+ Rather than providing a single "SQL SELECT Statement" type of Property, this Processor makes use
+ of user-defined properties. Each user-defined property that is added to the Processor has a name
+ that becomes a new Relationship for the Processor and a corresponding SQL query that will be evaluated
+ against each FlowFile. This allows multiple SQL queries to be run against each FlowFile.
+ </p>
+
+ <p>
+ The SQL syntax that is supported by this Processor is ANSI SQL and is powered by Apache Calcite.
+ </p>
+ </body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java
deleted file mode 100644
index 421da98..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestFilterCSVColumns {
-
- private static final Logger LOGGER;
-
- static {
- System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
- System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.FilterCSVColumns", "debug");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestFilterCSVColumns", "debug");
- LOGGER = LoggerFactory.getLogger(TestFilterCSVColumns.class);
- }
-
- @Test
- public void testTransformSimple() throws InitializationException, IOException, SQLException {
- String sql = "select first_name, last_name, company_name, address, city from CSV.A where city='New York'";
-
- Path inpath = Paths.get("src/test/resources/TestFilterCSVColumns/US500.csv");
- InputStream in = new FileInputStream(inpath.toFile());
-
- ResultSet resultSet = FilterCSVColumns.transform(in, sql);
-
- int nrofColumns = resultSet.getMetaData().getColumnCount();
-
- for (int i = 1; i <= nrofColumns; i++) {
- System.out.print(resultSet.getMetaData().getColumnLabel(i) + " ");
- }
- System.out.println();
-
- while (resultSet.next()) {
- for (int i = 1; i <= nrofColumns; i++) {
- System.out.print(resultSet.getString(i)+ " ");
- }
- System.out.println();
- }
- }
-
- @Test
- public void testTransformCalc() throws InitializationException, IOException, SQLException {
- String sql = "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from CSV.A where ID=100";
-
- Path inpath = Paths.get("src/test/resources/TestFilterCSVColumns/Numeric.csv");
- InputStream in = new FileInputStream(inpath.toFile());
-
- ResultSet resultSet = FilterCSVColumns.transform(in, sql);
-
- int nrofColumns = resultSet.getMetaData().getColumnCount();
-
- for (int i = 1; i <= nrofColumns; i++) {
- System.out.print(resultSet.getMetaData().getColumnLabel(i) + " ");
- }
- System.out.println();
-
- while (resultSet.next()) {
- for (int i = 1; i <= nrofColumns; i++) {
- System.out.print(resultSet.getString(i)+ " ");
- }
- double total = resultSet.getDouble("TOTAL");
- System.out.println();
- assertEquals(90.75, total, 0.0001);
- }
- }
-
- @Test
- public void testSimpleTypeless() throws InitializationException, IOException {
- final TestRunner runner = TestRunners.newTestRunner(FilterCSVColumns.class);
- String sql = "select first_name, last_name, company_name, address, city from CSV.A where city='New York'";
- runner.setProperty(FilterCSVColumns.SQL_SELECT, sql);
-
- runner.enqueue(Paths.get("src/test/resources/TestFilterCSVColumns/US500_typeless.csv"));
- runner.run();
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
- for (final MockFlowFile flowFile : flowFiles) {
- System.out.println(flowFile);
- System.out.println(new String(flowFile.toByteArray()));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
new file mode 100644
index 0000000..41469ba
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestQueryFlowFile {
+
+ static {
+ System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+ System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform", "debug");
+ }
+
+ private static final String REL_NAME = "success";
+
+ @Test
+ public void testSimple() throws InitializationException, IOException, SQLException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("age", RecordFieldType.INT);
+ parser.addRecord("Tom", 49);
+
+ final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
+ runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
+
+ final int numIterations = 1;
+ for (int i = 0; i < numIterations; i++) {
+ runner.enqueue(new byte[0]);
+ }
+
+ runner.setThreadCount(4);
+ runner.run(2 * numIterations);
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ System.out.println(new String(out.toByteArray()));
+ out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
+ }
+
+ @Test
+ public void testParseFailure() throws InitializationException, IOException, SQLException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("age", RecordFieldType.INT);
+ parser.addRecord("Tom", 49);
+
+ final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
+ runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
+
+ final int numIterations = 1;
+ for (int i = 0; i < numIterations; i++) {
+ runner.enqueue(new byte[0]);
+ }
+
+ runner.setThreadCount(4);
+ runner.run(2 * numIterations);
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ System.out.println(new String(out.toByteArray()));
+ out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
+ }
+
+
+ @Test
+ public void testTransformCalc() throws InitializationException, IOException, SQLException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("ID", RecordFieldType.INT);
+ parser.addSchemaField("AMOUNT1", RecordFieldType.FLOAT);
+ parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT);
+ parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT);
+
+ parser.addRecord("008", 10.05F, 15.45F, 89.99F);
+ parser.addRecord("100", 20.25F, 25.25F, 45.25F);
+ parser.addRecord("105", 20.05F, 25.05F, 45.05F);
+ parser.addRecord("200", 34.05F, 25.05F, 75.05F);
+
+ final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\"");
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from FLOWFILE where ID=100");
+ runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+
+ out.assertContentEquals("\"NAME\",\"POINTS\"\n\"100\",\"90.75\"\n");
+ }
+
+
+ @Test
+ public void testAggregateFunction() throws InitializationException, IOException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("points", RecordFieldType.INT);
+ parser.addRecord("Tom", 1);
+ parser.addRecord("Jerry", 2);
+ parser.addRecord("Tom", 99);
+
+ final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select name, sum(points) as points from FLOWFILE GROUP BY name");
+ runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
+
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0);
+ flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n");
+ }
+
+ @Test
+ public void testColumnNames() throws InitializationException, IOException {
+ final MockRecordParser parser = new MockRecordParser();
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("points", RecordFieldType.INT);
+ parser.addSchemaField("greeting", RecordFieldType.STRING);
+ parser.addRecord("Tom", 1, "Hello");
+ parser.addRecord("Jerry", 2, "Hi");
+ parser.addRecord("Tom", 99, "Howdy");
+
+ final List<String> colNames = new ArrayList<>();
+ colNames.add("name");
+ colNames.add("points");
+ colNames.add("greeting");
+ colNames.add("FAV_GREETING");
+ final ResultSetValidatingRecordWriter writer = new ResultSetValidatingRecordWriter(colNames);
+
+ final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(REL_NAME, "select *, greeting AS FAV_GREETING from FLOWFILE");
+ runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
+
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+ }
+
+
+ private static class ResultSetValidatingRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
+ private final List<String> columnNames;
+
+ public ResultSetValidatingRecordWriter(final List<String> colNames) {
+ this.columnNames = new ArrayList<>(colNames);
+ }
+
+ @Override
+ public RecordSetWriter createWriter(ComponentLog logger) {
+ return new RecordSetWriter() {
+ @Override
+ public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
+ final int colCount = rs.getSchema().getFieldCount();
+ Assert.assertEquals(columnNames.size(), colCount);
+
+ final List<String> colNames = new ArrayList<>(colCount);
+ for (int i = 0; i < colCount; i++) {
+ colNames.add(rs.getSchema().getField(i).getFieldName());
+ }
+
+ Assert.assertEquals(columnNames, colNames);
+
+ return WriteResult.of(0, Collections.emptyMap());
+ }
+
+ @Override
+ public String getMimeType() {
+ return "text/plain";
+ }
+
+ @Override
+ public WriteResult write(Record record, OutputStream out) throws IOException {
+ return null;
+ }
+ };
+ }
+
+ }
+
+ private static class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
+ private final String header;
+
+ public MockRecordWriter(final String header) {
+ this.header = header;
+ }
+
+ @Override
+ public RecordSetWriter createWriter(final ComponentLog logger) {
+ return new RecordSetWriter() {
+ @Override
+ public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
+ out.write(header.getBytes());
+ out.write("\n".getBytes());
+
+ int recordCount = 0;
+ final int numCols = rs.getSchema().getFieldCount();
+ Record record = null;
+ while ((record = rs.next()) != null) {
+ recordCount++;
+ int i = 0;
+ for (final String fieldName : record.getSchema().getFieldNames()) {
+ final String val = record.getAsString(fieldName);
+ out.write("\"".getBytes());
+ out.write(val.getBytes());
+ out.write("\"".getBytes());
+
+ if (i++ < numCols - 1) {
+ out.write(",".getBytes());
+ }
+ }
+ out.write("\n".getBytes());
+ }
+
+ return WriteResult.of(recordCount, Collections.emptyMap());
+ }
+
+ @Override
+ public String getMimeType() {
+ return "text/plain";
+ }
+
+ @Override
+ public WriteResult write(Record record, OutputStream out) throws IOException {
+ return null;
+ }
+ };
+ }
+ }
+
+ private static class MockRecordParser extends AbstractControllerService implements RowRecordReaderFactory {
+ private final List<Object[]> records = new ArrayList<>();
+ private final List<RecordField> fields = new ArrayList<>();
+ private final int failAfterN;
+
+ public MockRecordParser() {
+ this(-1);
+ }
+
+ public MockRecordParser(final int failAfterN) {
+ this.failAfterN = failAfterN;
+ }
+
+
+ public void addSchemaField(final String fieldName, final RecordFieldType type) {
+ fields.add(new RecordField(fieldName, type.getDataType()));
+ }
+
+ public void addRecord(Object... values) {
+ records.add(values);
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputStream in, ComponentLog logger) throws IOException {
+ final Iterator<Object[]> itr = records.iterator();
+
+ return new RecordReader() {
+ private int recordCount = 0;
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Record nextRecord() throws IOException, MalformedRecordException {
+ if (failAfterN >= recordCount) {
+ throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
+ }
+ recordCount++;
+
+ if (!itr.hasNext()) {
+ return null;
+ }
+
+ final Object[] values = itr.next();
+ final Map<String, Object> valueMap = new HashMap<>();
+ int i = 0;
+ for (final RecordField field : fields) {
+ final String fieldName = field.getFieldName();
+ valueMap.put(fieldName, values[i++]);
+ }
+
+ return new MapRecord(new SimpleRecordSchema(fields), valueMap);
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return new SimpleRecordSchema(fields);
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv
deleted file mode 100644
index 2d56bb7..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv
+++ /dev/null
@@ -1,5 +0,0 @@
-ID:int,AMOUNT1: float,AMOUNT2:float,AMOUNT3:float
-008, 10.05, 15.45, 89.99
-100, 20.25, 25.25, 45.25
-105, 20.05, 25.05, 45.05
-200, 34.05, 25.05, 75.05
\ No newline at end of file