You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/11/23 09:32:14 UTC

nifi git commit: NIFI-5333 Added GetMongoRecord.

Repository: nifi
Updated Branches:
  refs/heads/master e34d653ba -> e603c486f


NIFI-5333 Added GetMongoRecord.

Signed-off-by: zenfenan <ze...@apache.org>

This closes #3011


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e603c486
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e603c486
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e603c486

Branch: refs/heads/master
Commit: e603c486f49f058d5f37892d172c1d6dddff55c7
Parents: e34d653
Author: Mike Thomsen <mi...@gmail.com>
Authored: Sun Sep 2 15:58:33 2018 -0400
Committer: zenfenan <si...@gmail.com>
Committed: Fri Nov 23 15:01:45 2018 +0530

----------------------------------------------------------------------
 .../record/util/DataTypeUtils.java              |   5 +
 .../nifi-mongodb-processors/pom.xml             |   1 +
 .../mongodb/AbstractMongoQueryProcessor.java    | 150 ++++++++++++++
 .../nifi/processors/mongodb/GetMongo.java       | 113 +---------
 .../nifi/processors/mongodb/GetMongoRecord.java | 205 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      |  59 ++++++
 .../processors/mongodb/GetMongoRecordIT.groovy  | 179 ++++++++++++++++
 8 files changed, 602 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 336a70d..f206a64 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -602,6 +602,11 @@ public class DataTypeUtils {
             return null;
         }
 
+        if (value instanceof java.util.Date) {
+            java.util.Date _temp = (java.util.Date)value;
+            return new Date(_temp.getTime());
+        }
+
         if (value instanceof Date) {
             return (Date) value;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
index 576fb1a..2a2ae87 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
@@ -101,6 +101,7 @@
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mongodb-client-service-api</artifactId>
+            <version>1.9.0-SNAPSHOT</version>
             <scope>compile</scope>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java
new file mode 100644
index 0000000..6660551
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.Document;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractMongoQueryProcessor extends AbstractMongoProcessor {
+    public static final String DB_NAME = "mongo.database.name";
+    public static final String COL_NAME = "mongo.collection.name";
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles that have the results of a successful query execution go here.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All input FlowFiles that are part of a failed query execution go here.")
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("All input FlowFiles that are part of a successful query execution go here.")
+            .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+            .name("Query")
+            .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" +
+                    " an incoming connection from another processor to provide the query as a valid JSON document inside of " +
+                    "the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
+                    "that will result in a full collection fetch using a \"{}\" query.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(JsonValidator.INSTANCE)
+            .build();
+
+    public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
+            .name("Projection")
+            .description("The fields to be returned from the documents in the result set; must be a valid BSON document")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(JsonValidator.INSTANCE)
+            .build();
+
+    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
+            .name("Sort")
+            .description("The fields by which to sort; must be a valid BSON document")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(JsonValidator.INSTANCE)
+            .build();
+
+    public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+            .name("Limit")
+            .description("The maximum number of elements to return")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The number of elements to be returned from the server in one batch")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
+            .name("results-per-flowfile")
+            .displayName("Results Per FlowFile")
+            .description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    protected Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+        Document query = null;
+        if (context.getProperty(QUERY).isSet()) {
+            query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue());
+        } else if (!context.getProperty(QUERY).isSet() && input == null) {
+            query = Document.parse("{}");
+        } else {
+            try {
+                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                session.exportTo(input, out);
+                out.close();
+                query = Document.parse(new String(out.toByteArray()));
+            } catch (Exception ex) {
+                getLogger().error("Error reading FlowFile : ", ex);
+                if (input != null) { //Likely culprit is a bad query
+                    session.transfer(input, REL_FAILURE);
+                    session.commit();
+                } else {
+                    throw new ProcessException(ex);
+                }
+            }
+        }
+
+        return query;
+    }
+
+    protected Map<String, String> getAttributes(ProcessContext context, FlowFile input, Document query, MongoCollection collection) {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+
+        if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
+            final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
+            attributes.put(queryAttr, query.toJson());
+        }
+
+        attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
+        attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
+
+        return attributes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index b8d2f8d..ff65f86 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -32,25 +32,19 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.JsonValidator;
-import org.apache.nifi.processor.util.StandardValidators;
 import org.bson.Document;
 import org.bson.json.JsonWriterSettings;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -63,75 +57,8 @@ import java.util.Set;
     @WritesAttribute(attribute = GetMongo.DB_NAME, description = "The database where the results came from."),
     @WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.")
 })
-public class GetMongo extends AbstractMongoProcessor {
-    static final String DB_NAME = "mongo.database.name";
-    static final String COL_NAME = "mongo.collection.name";
+public class GetMongo extends AbstractMongoQueryProcessor {
 
-    static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All FlowFiles that have the results of a successful query execution go here.")
-            .build();
-
-    static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("All input FlowFiles that are part of a failed query execution go here.")
-            .build();
-
-    static final Relationship REL_ORIGINAL = new Relationship.Builder()
-            .name("original")
-            .description("All input FlowFiles that are part of a successful query execution go here.")
-            .build();
-
-    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
-        .name("Query")
-        .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" +
-                " an incoming connection from another processor to provide the query as a valid JSON document inside of " +
-                "the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
-                "that will result in a full collection fetch using a \"{}\" query.")
-        .required(false)
-        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-        .addValidator(JsonValidator.INSTANCE)
-        .build();
-
-    static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
-            .name("Projection")
-            .description("The fields to be returned from the documents in the result set; must be a valid BSON document")
-            .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(JsonValidator.INSTANCE)
-            .build();
-
-    static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
-            .name("Sort")
-            .description("The fields by which to sort; must be a valid BSON document")
-            .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(JsonValidator.INSTANCE)
-            .build();
-
-    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
-            .name("Limit")
-            .description("The maximum number of elements to return")
-            .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-
-    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Batch Size")
-            .description("The number of elements to be returned from the server in one batch")
-            .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
-            .name("results-per-flowfile")
-            .displayName("Results Per FlowFile")
-            .description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.")
-            .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
 
     static final AllowableValue YES_PP = new AllowableValue("true", "True");
     static final AllowableValue NO_PP  = new AllowableValue("false", "False");
@@ -231,14 +158,7 @@ public class GetMongo extends AbstractMongoProcessor {
         final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
         final String usePrettyPrint  = context.getProperty(USE_PRETTY_PRINTING).getValue();
         final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
-        final Map<String, String> attributes = new HashMap<>();
-
-        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
 
-        if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
-            final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
-            attributes.put(queryAttr, query.toJson());
-        }
 
         final Document projection = context.getProperty(PROJECTION).isSet()
                 ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
@@ -250,9 +170,7 @@ public class GetMongo extends AbstractMongoProcessor {
 
         final MongoCollection<Document> collection = getCollection(context, input);
         final FindIterable<Document> it = collection.find(query);
-
-        attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
-        attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
+        final Map<String, String> attributes = getAttributes(context, input, query, collection);
 
         if (projection != null) {
             it.projection(projection);
@@ -319,31 +237,4 @@ public class GetMongo extends AbstractMongoProcessor {
         }
 
     }
-
-    private Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
-        Document query = null;
-        if (context.getProperty(QUERY).isSet()) {
-            query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue());
-        } else if (!context.getProperty(QUERY).isSet() && input == null) {
-            query = Document.parse("{}");
-        } else {
-            try {
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
-                session.exportTo(input, out);
-                out.close();
-                query = Document.parse(new String(out.toByteArray()));
-            } catch (Exception ex) {
-                logger.error("Error reading FlowFile : ", ex);
-                if (input != null) { //Likely culprit is a bad query
-                    session.transfer(input, REL_FAILURE);
-                    session.commit();
-                } else {
-                    throw new ProcessException(ex);
-                }
-            }
-        }
-
-        return query;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java
new file mode 100644
index 0000000..49878ba
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.bson.Document;
+import org.bson.types.ObjectId;
+
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@CapabilityDescription("A record-based version of GetMongo that uses the Record writers to write the MongoDB result set.")
+@Tags({"mongo", "mongodb", "get", "fetch", "record", "json"})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@WritesAttributes({
+    @WritesAttribute(attribute = GetMongo.DB_NAME, description = "The database where the results came from."),
+    @WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.")
+})
+public class GetMongoRecord extends AbstractMongoQueryProcessor {
+    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+        .name("get-mongo-record-writer-factory")
+        .displayName("Record Writer")
+        .description("The record writer to use to write the result sets.")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("mongodb-schema-name")
+        .displayName("Schema Name")
+        .description("The name of the schema in the configured schema registry to use for the query results.")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .defaultValue("${schema.name}")
+        .required(true)
+        .build();
+
+    private static final List<PropertyDescriptor> DESCRIPTORS;
+    private static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        List<PropertyDescriptor> _temp = new ArrayList<>();
+        _temp.add(CLIENT_SERVICE);
+        _temp.add(WRITER_FACTORY);
+        _temp.add(DATABASE_NAME);
+        _temp.add(COLLECTION_NAME);
+        _temp.add(SCHEMA_NAME);
+        _temp.add(QUERY_ATTRIBUTE);
+        _temp.add(QUERY);
+        _temp.add(PROJECTION);
+        _temp.add(SORT);
+        _temp.add(LIMIT);
+        _temp.add(BATCH_SIZE);
+
+        DESCRIPTORS = Collections.unmodifiableList(_temp);
+
+        Set<Relationship> _rels = new HashSet<>();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        _rels.add(REL_ORIGINAL);
+        RELATIONSHIPS = Collections.unmodifiableSet(_rels);
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile MongoDBClientService clientService;
+    private volatile RecordSetWriterFactory writerFactory;
+
+    @OnScheduled
+    public void onEnabled(ProcessContext context) {
+        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
+        writerFactory = context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile input = null;
+
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        final String database = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(input).getValue();
+        final String collection = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(input).getValue();
+        final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
+        final Document query = getQuery(context, session, input);
+
+        MongoCollection mongoCollection = clientService.getDatabase(database).getCollection(collection);
+
+        FindIterable<Document> find = mongoCollection.find(query);
+        if (context.getProperty(SORT).isSet()) {
+            find = find.sort(Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()));
+        }
+        if (context.getProperty(PROJECTION).isSet()) {
+            find = find.projection(Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()));
+        }
+        if (context.getProperty(LIMIT).isSet()) {
+            find = find.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
+        }
+
+        MongoCursor<Document> cursor = find.iterator();
+
+        FlowFile output = input != null ? session.create(input) : session.create();
+        final FlowFile inputPtr = input;
+        try {
+            final Map<String, String> attributes = getAttributes(context, input, query, mongoCollection);
+            try (OutputStream out = session.write(output)) {
+                Map<String, String> attrs = inputPtr != null ? inputPtr.getAttributes() : new HashMap<String, String>(){{
+                    put("schema.name", schemaName);
+                }};
+                RecordSchema schema = writerFactory.getSchema(attrs, null);
+                RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out);
+                long count = 0L;
+                writer.beginRecordSet();
+                while (cursor.hasNext()) {
+                    Document next = cursor.next();
+                    if (next.get("_id") instanceof ObjectId) {
+                        next.put("_id", next.get("_id").toString());
+                    }
+                    Record record = new MapRecord(schema, next);
+                    writer.write(record);
+                    count++;
+                }
+                writer.finishRecordSet();
+                writer.close();
+                out.close();
+                attributes.put("record.count", String.valueOf(count));
+            } catch (SchemaNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+
+
+            output = session.putAllAttributes(output, attributes);
+
+            session.getProvenanceReporter().fetch(output, getURI(context));
+            session.transfer(output, REL_SUCCESS);
+            if (input != null) {
+                session.transfer(input, REL_ORIGINAL);
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            getLogger().error("Error writing record set from Mongo query.", ex);
+            session.remove(output);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9e7bc08..bfe2b74 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,6 +15,7 @@
 
 org.apache.nifi.processors.mongodb.DeleteMongo
 org.apache.nifi.processors.mongodb.GetMongo
+org.apache.nifi.processors.mongodb.GetMongoRecord
 org.apache.nifi.processors.mongodb.RunMongoAggregation
 org.apache.nifi.processors.mongodb.PutMongo
 org.apache.nifi.processors.mongodb.PutMongoRecord
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html
new file mode 100644
index 0000000..2a686cf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html
@@ -0,0 +1,59 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>GetMongoRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h2>Description:</h2>
+<p>
+    This processor runs queries against a MongoDB instance or cluster and writes the results to a flowfile. It allows
+    input, but can run standalone as well. It is a record-aware version of the <em>GetMongo</em> processor.
+</p>
+<h2>Specifying the Query</h2>
+<p>
+    The query can be specified in one of three ways:
+</p>
+<ul>
+    <li>Query configuration property.</li>
+    <li>Query Attribute configuration property.</li>
+    <li>FlowFile content.</li>
+</ul>
+<p>
+    If a value is specified in either of the configuration properties, it will not look in the FlowFile content for a
+    query.
+</p>
+<h2>Limiting/Shaping Results</h2>
+<p>
+    The following options for limiting/shaping results are available:
+</p>
+<ul>
+    <li>Limit - limit the number of results. This should not be confused with the "batch size" option which is a
+        setting for the underlying MongoDB driver to tell it how many items to retrieve in each poll of the server.</li>
+    <li>Sort - sort the result set. Requires a JSON document like <em>{ "someDate": -1 }</em></li>
+    <li>Projection - control which fields to return. Exampe, which would remove <em>_id</em>: <em>{ "_id": 0 }</em></li>
+</ul>
+<h2>Misc Options</h2>
+<p>
+    Results Per FlowFile, if set, creates a JSON array out of a batch of results and writes the result to the output.
+    Pretty Print, if enabled, will format the JSON data to be easy read by a human (ex. proper indentation of fields).
+</p>
+</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy
new file mode 100644
index 0000000..fe21977
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb
+
+import groovy.json.JsonSlurper
+import org.apache.nifi.flowfile.attributes.CoreAttributes
+import org.apache.nifi.json.JsonRecordSetWriter
+import org.apache.nifi.mongodb.MongoDBClientService
+import org.apache.nifi.mongodb.MongoDBControllerService
+import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.serialization.DateTimeUtils
+import org.apache.nifi.serialization.SimpleRecordSchema
+import org.apache.nifi.serialization.record.*
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.bson.Document
+import org.junit.After
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+import static groovy.json.JsonOutput.*
+
+class GetMongoRecordIT {
+    TestRunner runner
+    MongoDBClientService service
+
+    static RecordSchema SCHEMA
+    static final String DB_NAME = GetMongoRecord.class.simpleName + Calendar.instance.timeInMillis
+    static final String COL_NAME = "test"
+    static final String URI = "mongodb://localhost:27017"
+
+    static {
+        def fields = [
+            new RecordField("name", RecordFieldType.STRING.dataType),
+            new RecordField("failedLogins", RecordFieldType.INT.dataType),
+            new RecordField("lastLogin", RecordFieldType.DATE.dataType)
+        ]
+        SCHEMA = new SimpleRecordSchema(fields, new StandardSchemaIdentifier.Builder().name("sample").build())
+    }
+
+    static final List<Map> SAMPLES = [
+        [ name: "John Smith", failedLogins: 2, lastLogin: Calendar.instance.time ],
+        [ name: "Jane Doe", failedLogins: 1, lastLogin: Calendar.instance.time - 360000 ],
+        [ name: "John Brown", failedLogins: 4, lastLogin: Calendar.instance.time - 10000 ]
+    ].collect { new Document(it) }
+
+    @Before
+    void setup() {
+        runner = TestRunners.newTestRunner(GetMongoRecord.class)
+        service = new MongoDBControllerService()
+        runner.addControllerService("client", service)
+        runner.setProperty(service, MongoDBControllerService.URI, URI)
+        runner.enableControllerService(service)
+
+        def writer = new JsonRecordSetWriter()
+        def registry = new MockSchemaRegistry()
+        registry.addSchema("sample", SCHEMA)
+
+        runner.addControllerService("writer", writer)
+        runner.addControllerService("registry", registry)
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
+        runner.setProperty(writer, DateTimeUtils.DATE_FORMAT, "yyyy")
+        runner.enableControllerService(registry)
+        runner.enableControllerService(writer)
+
+        runner.setProperty(GetMongoRecord.DATABASE_NAME, DB_NAME)
+        runner.setProperty(GetMongoRecord.COLLECTION_NAME, COL_NAME)
+        runner.setProperty(GetMongoRecord.CLIENT_SERVICE, "client")
+        runner.setProperty(GetMongoRecord.WRITER_FACTORY, "writer")
+
+        service.getDatabase(DB_NAME).getCollection(COL_NAME).insertMany(SAMPLES)
+    }
+
+    @After
+    void after() {
+        service.getDatabase(DB_NAME).drop()
+    }
+
+    @Test
+    void testLookup() {
+        def ffValidator = { TestRunner runner ->
+            def ffs = runner.getFlowFilesForRelationship(GetMongoRecord.REL_SUCCESS)
+            Assert.assertNotNull(ffs)
+            Assert.assertTrue(ffs.size() == 1)
+            Assert.assertEquals("3", ffs[0].getAttribute("record.count"))
+            Assert.assertEquals("application/json", ffs[0].getAttribute(CoreAttributes.MIME_TYPE.key()))
+            Assert.assertEquals(COL_NAME, ffs[0].getAttribute(GetMongoRecord.COL_NAME))
+            Assert.assertEquals(DB_NAME, ffs[0].getAttribute(GetMongoRecord.DB_NAME))
+            Assert.assertEquals(Document.parse("{}"), Document.parse(ffs[0].getAttribute("executed.query")))
+        }
+
+        runner.setProperty(GetMongoRecord.QUERY_ATTRIBUTE, "executed.query")
+        runner.setProperty(GetMongoRecord.QUERY, "{}")
+        runner.enqueue("", [ "schema.name": "sample"])
+        runner.run()
+
+        runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(GetMongoRecord.REL_ORIGINAL, 1)
+
+        ffValidator(runner)
+
+        runner.clearTransferState()
+        runner.removeProperty(GetMongoRecord.QUERY)
+        runner.enqueue("{}", [ "schema.name": "sample"])
+        runner.run()
+
+        runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(GetMongoRecord.REL_ORIGINAL, 1)
+
+        ffValidator(runner)
+    }
+
+    @Test
+    void testSortAndProjection() {
+        runner.setIncomingConnection(false)
+        runner.setVariable("schema.name", "sample")
+        runner.setProperty(GetMongoRecord.SORT, toJson([failedLogins: 1]))
+        runner.setProperty(GetMongoRecord.PROJECTION, toJson([failedLogins: 1]))
+        runner.setProperty(GetMongoRecord.QUERY, "{}")
+        runner.run()
+
+        def parsed = sharedTest()
+        Assert.assertEquals(3, parsed.size())
+        def values = [1, 2, 4]
+        int index = 0
+        parsed.each {
+            Assert.assertEquals(values[index++], it["failedLogins"])
+            Assert.assertNull(it["name"])
+            Assert.assertNull(it["lastLogin"])
+        }
+    }
+
+    List<Map<String, Object>> sharedTest() {
+        runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
+
+        def ff = runner.getFlowFilesForRelationship(GetMongoRecord.REL_SUCCESS)[0]
+        def raw = runner.getContentAsByteArray(ff)
+        String content = new String(raw)
+        def parsed = new JsonSlurper().parseText(content)
+        Assert.assertNotNull(parsed)
+
+        parsed
+    }
+
+    @Test
+    void testLimit() {
+        runner.setIncomingConnection(false)
+        runner.setProperty(GetMongoRecord.LIMIT, "1")
+        runner.setProperty(GetMongoRecord.QUERY, "{}")
+        runner.setVariable("schema.name", "sample")
+        runner.run()
+
+        def parsed = sharedTest()
+        Assert.assertEquals(1, parsed.size())
+
+    }
+}