You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/11/23 09:32:14 UTC
nifi git commit: NIFI-5333 Added GetMongoRecord.
Repository: nifi
Updated Branches:
refs/heads/master e34d653ba -> e603c486f
NIFI-5333 Added GetMongoRecord.
Signed-off-by: zenfenan <ze...@apache.org>
This closes #3011
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e603c486
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e603c486
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e603c486
Branch: refs/heads/master
Commit: e603c486f49f058d5f37892d172c1d6dddff55c7
Parents: e34d653
Author: Mike Thomsen <mi...@gmail.com>
Authored: Sun Sep 2 15:58:33 2018 -0400
Committer: zenfenan <si...@gmail.com>
Committed: Fri Nov 23 15:01:45 2018 +0530
----------------------------------------------------------------------
.../record/util/DataTypeUtils.java | 5 +
.../nifi-mongodb-processors/pom.xml | 1 +
.../mongodb/AbstractMongoQueryProcessor.java | 150 ++++++++++++++
.../nifi/processors/mongodb/GetMongo.java | 113 +---------
.../nifi/processors/mongodb/GetMongoRecord.java | 205 +++++++++++++++++++
.../org.apache.nifi.processor.Processor | 1 +
.../additionalDetails.html | 59 ++++++
.../processors/mongodb/GetMongoRecordIT.groovy | 179 ++++++++++++++++
8 files changed, 602 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 336a70d..f206a64 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -602,6 +602,11 @@ public class DataTypeUtils {
return null;
}
+ if (value instanceof java.util.Date) {
+ java.util.Date _temp = (java.util.Date)value;
+ return new Date(_temp.getTime());
+ }
+
if (value instanceof Date) {
return (Date) value;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
index 576fb1a..2a2ae87 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
@@ -101,6 +101,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-client-service-api</artifactId>
+ <version>1.9.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java
new file mode 100644
index 0000000..6660551
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.Document;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractMongoQueryProcessor extends AbstractMongoProcessor {
+ public static final String DB_NAME = "mongo.database.name";
+ public static final String COL_NAME = "mongo.collection.name";
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles that have the results of a successful query execution go here.")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("All input FlowFiles that are part of a failed query execution go here.")
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("All input FlowFiles that are part of a successful query execution go here.")
+ .build();
+
+ public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+ .name("Query")
+ .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" +
+ " an incoming connection from another processor to provide the query as a valid JSON document inside of " +
+ "the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
+ "that will result in a full collection fetch using a \"{}\" query.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .build();
+
+ public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
+ .name("Projection")
+ .description("The fields to be returned from the documents in the result set; must be a valid BSON document")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .build();
+
+ public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
+ .name("Sort")
+ .description("The fields by which to sort; must be a valid BSON document")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .build();
+
+ public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+ .name("Limit")
+ .description("The maximum number of elements to return")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The number of elements to be returned from the server in one batch")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
+ .name("results-per-flowfile")
+ .displayName("Results Per FlowFile")
+ .description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ protected Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+ Document query = null;
+ if (context.getProperty(QUERY).isSet()) {
+ query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue());
+ } else if (!context.getProperty(QUERY).isSet() && input == null) {
+ query = Document.parse("{}");
+ } else {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ session.exportTo(input, out);
+ out.close();
+ query = Document.parse(new String(out.toByteArray()));
+ } catch (Exception ex) {
+ getLogger().error("Error reading FlowFile : ", ex);
+ if (input != null) { //Likely culprit is a bad query
+ session.transfer(input, REL_FAILURE);
+ session.commit();
+ } else {
+ throw new ProcessException(ex);
+ }
+ }
+ }
+
+ return query;
+ }
+
+ protected Map<String, String> getAttributes(ProcessContext context, FlowFile input, Document query, MongoCollection collection) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+
+ if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
+ final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
+ attributes.put(queryAttr, query.toJson());
+ }
+
+ attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
+ attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
+
+ return attributes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index b8d2f8d..ff65f86 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -32,25 +32,19 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.JsonValidator;
-import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import org.bson.json.JsonWriterSettings;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -63,75 +57,8 @@ import java.util.Set;
@WritesAttribute(attribute = GetMongo.DB_NAME, description = "The database where the results came from."),
@WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.")
})
-public class GetMongo extends AbstractMongoProcessor {
- static final String DB_NAME = "mongo.database.name";
- static final String COL_NAME = "mongo.collection.name";
+public class GetMongo extends AbstractMongoQueryProcessor {
- static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All FlowFiles that have the results of a successful query execution go here.")
- .build();
-
- static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("All input FlowFiles that are part of a failed query execution go here.")
- .build();
-
- static final Relationship REL_ORIGINAL = new Relationship.Builder()
- .name("original")
- .description("All input FlowFiles that are part of a successful query execution go here.")
- .build();
-
- static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
- .name("Query")
- .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" +
- " an incoming connection from another processor to provide the query as a valid JSON document inside of " +
- "the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
- "that will result in a full collection fetch using a \"{}\" query.")
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(JsonValidator.INSTANCE)
- .build();
-
- static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
- .name("Projection")
- .description("The fields to be returned from the documents in the result set; must be a valid BSON document")
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(JsonValidator.INSTANCE)
- .build();
-
- static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
- .name("Sort")
- .description("The fields by which to sort; must be a valid BSON document")
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(JsonValidator.INSTANCE)
- .build();
-
- static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
- .name("Limit")
- .description("The maximum number of elements to return")
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
-
- static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("Batch Size")
- .description("The number of elements to be returned from the server in one batch")
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
- static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
- .name("results-per-flowfile")
- .displayName("Results Per FlowFile")
- .description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.")
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
static final AllowableValue YES_PP = new AllowableValue("true", "True");
static final AllowableValue NO_PP = new AllowableValue("false", "False");
@@ -231,14 +158,7 @@ public class GetMongo extends AbstractMongoProcessor {
final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
- final Map<String, String> attributes = new HashMap<>();
-
- attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
- if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
- final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
- attributes.put(queryAttr, query.toJson());
- }
final Document projection = context.getProperty(PROJECTION).isSet()
? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
@@ -250,9 +170,7 @@ public class GetMongo extends AbstractMongoProcessor {
final MongoCollection<Document> collection = getCollection(context, input);
final FindIterable<Document> it = collection.find(query);
-
- attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
- attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
+ final Map<String, String> attributes = getAttributes(context, input, query, collection);
if (projection != null) {
it.projection(projection);
@@ -319,31 +237,4 @@ public class GetMongo extends AbstractMongoProcessor {
}
}
-
- private Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
- Document query = null;
- if (context.getProperty(QUERY).isSet()) {
- query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue());
- } else if (!context.getProperty(QUERY).isSet() && input == null) {
- query = Document.parse("{}");
- } else {
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- session.exportTo(input, out);
- out.close();
- query = Document.parse(new String(out.toByteArray()));
- } catch (Exception ex) {
- logger.error("Error reading FlowFile : ", ex);
- if (input != null) { //Likely culprit is a bad query
- session.transfer(input, REL_FAILURE);
- session.commit();
- } else {
- throw new ProcessException(ex);
- }
- }
- }
-
- return query;
- }
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java
new file mode 100644
index 0000000..49878ba
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.bson.Document;
+import org.bson.types.ObjectId;
+
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@CapabilityDescription("A record-based version of GetMongo that uses the Record writers to write the MongoDB result set.")
+@Tags({"mongo", "mongodb", "get", "fetch", "record", "json"})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@WritesAttributes({
+ @WritesAttribute(attribute = GetMongo.DB_NAME, description = "The database where the results came from."),
+ @WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.")
+})
+public class GetMongoRecord extends AbstractMongoQueryProcessor {
+ public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder()
+ .name("get-mongo-record-writer-factory")
+ .displayName("Record Writer")
+ .description("The record writer to use to write the result sets.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+ .name("mongodb-schema-name")
+ .displayName("Schema Name")
+ .description("The name of the schema in the configured schema registry to use for the query results.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue("${schema.name}")
+ .required(true)
+ .build();
+
+ private static final List<PropertyDescriptor> DESCRIPTORS;
+ private static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ List<PropertyDescriptor> _temp = new ArrayList<>();
+ _temp.add(CLIENT_SERVICE);
+ _temp.add(WRITER_FACTORY);
+ _temp.add(DATABASE_NAME);
+ _temp.add(COLLECTION_NAME);
+ _temp.add(SCHEMA_NAME);
+ _temp.add(QUERY_ATTRIBUTE);
+ _temp.add(QUERY);
+ _temp.add(PROJECTION);
+ _temp.add(SORT);
+ _temp.add(LIMIT);
+ _temp.add(BATCH_SIZE);
+
+ DESCRIPTORS = Collections.unmodifiableList(_temp);
+
+ Set<Relationship> _rels = new HashSet<>();
+ _rels.add(REL_SUCCESS);
+ _rels.add(REL_FAILURE);
+ _rels.add(REL_ORIGINAL);
+ RELATIONSHIPS = Collections.unmodifiableSet(_rels);
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ private volatile MongoDBClientService clientService;
+ private volatile RecordSetWriterFactory writerFactory;
+
+ @OnScheduled
+ public void onEnabled(ProcessContext context) {
+ clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
+ writerFactory = context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile input = null;
+
+ if (context.hasIncomingConnection()) {
+ input = session.get();
+ if (input == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ final String database = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(input).getValue();
+ final String collection = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(input).getValue();
+ final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
+ final Document query = getQuery(context, session, input);
+
+ MongoCollection mongoCollection = clientService.getDatabase(database).getCollection(collection);
+
+ FindIterable<Document> find = mongoCollection.find(query);
+ if (context.getProperty(SORT).isSet()) {
+ find = find.sort(Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()));
+ }
+ if (context.getProperty(PROJECTION).isSet()) {
+ find = find.projection(Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()));
+ }
+ if (context.getProperty(LIMIT).isSet()) {
+ find = find.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
+ }
+
+ MongoCursor<Document> cursor = find.iterator();
+
+ FlowFile output = input != null ? session.create(input) : session.create();
+ final FlowFile inputPtr = input;
+ try {
+ final Map<String, String> attributes = getAttributes(context, input, query, mongoCollection);
+ try (OutputStream out = session.write(output)) {
+ Map<String, String> attrs = inputPtr != null ? inputPtr.getAttributes() : new HashMap<String, String>(){{
+ put("schema.name", schemaName);
+ }};
+ RecordSchema schema = writerFactory.getSchema(attrs, null);
+ RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out);
+ long count = 0L;
+ writer.beginRecordSet();
+ while (cursor.hasNext()) {
+ Document next = cursor.next();
+ if (next.get("_id") instanceof ObjectId) {
+ next.put("_id", next.get("_id").toString());
+ }
+ Record record = new MapRecord(schema, next);
+ writer.write(record);
+ count++;
+ }
+ writer.finishRecordSet();
+ writer.close();
+ out.close();
+ attributes.put("record.count", String.valueOf(count));
+ } catch (SchemaNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+
+ output = session.putAllAttributes(output, attributes);
+
+ session.getProvenanceReporter().fetch(output, getURI(context));
+ session.transfer(output, REL_SUCCESS);
+ if (input != null) {
+ session.transfer(input, REL_ORIGINAL);
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ getLogger().error("Error writing record set from Mongo query.", ex);
+ session.remove(output);
+ if (input != null) {
+ session.transfer(input, REL_FAILURE);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9e7bc08..bfe2b74 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,6 +15,7 @@
org.apache.nifi.processors.mongodb.DeleteMongo
org.apache.nifi.processors.mongodb.GetMongo
+org.apache.nifi.processors.mongodb.GetMongoRecord
org.apache.nifi.processors.mongodb.RunMongoAggregation
org.apache.nifi.processors.mongodb.PutMongo
org.apache.nifi.processors.mongodb.PutMongoRecord
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html
new file mode 100644
index 0000000..2a686cf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html
@@ -0,0 +1,59 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>GetMongoRecord</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h2>Description:</h2>
+<p>
+ This processor runs queries against a MongoDB instance or cluster and writes the results to a flowfile. It allows
+ input, but can run standalone as well. It is a record-aware version of the <em>GetMongo</em> processor.
+</p>
+<h2>Specifying the Query</h2>
+<p>
+ The query can be specified in one of three ways:
+</p>
+<ul>
+ <li>Query configuration property.</li>
+ <li>Query Attribute configuration property.</li>
+ <li>FlowFile content.</li>
+</ul>
+<p>
+ If a value is specified in either of the configuration properties, it will not look in the FlowFile content for a
+ query.
+</p>
+<h2>Limiting/Shaping Results</h2>
+<p>
+ The following options for limiting/shaping results are available:
+</p>
+<ul>
+ <li>Limit - limit the number of results. This should not be confused with the "batch size" option which is a
+ setting for the underlying MongoDB driver to tell it how many items to retrieve in each poll of the server.</li>
+ <li>Sort - sort the result set. Requires a JSON document like <em>{ "someDate": -1 }</em></li>
+ <li>Projection - control which fields to return. Exampe, which would remove <em>_id</em>: <em>{ "_id": 0 }</em></li>
+</ul>
+<h2>Misc Options</h2>
+<p>
+ Results Per FlowFile, if set, creates a JSON array out of a batch of results and writes the result to the output.
+ Pretty Print, if enabled, will format the JSON data to be easy read by a human (ex. proper indentation of fields).
+</p>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy
new file mode 100644
index 0000000..fe21977
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb
+
+import groovy.json.JsonSlurper
+import org.apache.nifi.flowfile.attributes.CoreAttributes
+import org.apache.nifi.json.JsonRecordSetWriter
+import org.apache.nifi.mongodb.MongoDBClientService
+import org.apache.nifi.mongodb.MongoDBControllerService
+import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.serialization.DateTimeUtils
+import org.apache.nifi.serialization.SimpleRecordSchema
+import org.apache.nifi.serialization.record.*
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.bson.Document
+import org.junit.After
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+import static groovy.json.JsonOutput.*
+
+class GetMongoRecordIT {
+ TestRunner runner
+ MongoDBClientService service
+
+ static RecordSchema SCHEMA
+ static final String DB_NAME = GetMongoRecord.class.simpleName + Calendar.instance.timeInMillis
+ static final String COL_NAME = "test"
+ static final String URI = "mongodb://localhost:27017"
+
+ static {
+ def fields = [
+ new RecordField("name", RecordFieldType.STRING.dataType),
+ new RecordField("failedLogins", RecordFieldType.INT.dataType),
+ new RecordField("lastLogin", RecordFieldType.DATE.dataType)
+ ]
+ SCHEMA = new SimpleRecordSchema(fields, new StandardSchemaIdentifier.Builder().name("sample").build())
+ }
+
+ static final List<Map> SAMPLES = [
+ [ name: "John Smith", failedLogins: 2, lastLogin: Calendar.instance.time ],
+ [ name: "Jane Doe", failedLogins: 1, lastLogin: Calendar.instance.time - 360000 ],
+ [ name: "John Brown", failedLogins: 4, lastLogin: Calendar.instance.time - 10000 ]
+ ].collect { new Document(it) }
+
+ @Before
+ void setup() {
+ runner = TestRunners.newTestRunner(GetMongoRecord.class)
+ service = new MongoDBControllerService()
+ runner.addControllerService("client", service)
+ runner.setProperty(service, MongoDBControllerService.URI, URI)
+ runner.enableControllerService(service)
+
+ def writer = new JsonRecordSetWriter()
+ def registry = new MockSchemaRegistry()
+ registry.addSchema("sample", SCHEMA)
+
+ runner.addControllerService("writer", writer)
+ runner.addControllerService("registry", registry)
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
+ runner.setProperty(writer, DateTimeUtils.DATE_FORMAT, "yyyy")
+ runner.enableControllerService(registry)
+ runner.enableControllerService(writer)
+
+ runner.setProperty(GetMongoRecord.DATABASE_NAME, DB_NAME)
+ runner.setProperty(GetMongoRecord.COLLECTION_NAME, COL_NAME)
+ runner.setProperty(GetMongoRecord.CLIENT_SERVICE, "client")
+ runner.setProperty(GetMongoRecord.WRITER_FACTORY, "writer")
+
+ service.getDatabase(DB_NAME).getCollection(COL_NAME).insertMany(SAMPLES)
+ }
+
+ @After
+ void after() {
+ service.getDatabase(DB_NAME).drop()
+ }
+
+ @Test
+ void testLookup() {
+ def ffValidator = { TestRunner runner ->
+ def ffs = runner.getFlowFilesForRelationship(GetMongoRecord.REL_SUCCESS)
+ Assert.assertNotNull(ffs)
+ Assert.assertTrue(ffs.size() == 1)
+ Assert.assertEquals("3", ffs[0].getAttribute("record.count"))
+ Assert.assertEquals("application/json", ffs[0].getAttribute(CoreAttributes.MIME_TYPE.key()))
+ Assert.assertEquals(COL_NAME, ffs[0].getAttribute(GetMongoRecord.COL_NAME))
+ Assert.assertEquals(DB_NAME, ffs[0].getAttribute(GetMongoRecord.DB_NAME))
+ Assert.assertEquals(Document.parse("{}"), Document.parse(ffs[0].getAttribute("executed.query")))
+ }
+
+ runner.setProperty(GetMongoRecord.QUERY_ATTRIBUTE, "executed.query")
+ runner.setProperty(GetMongoRecord.QUERY, "{}")
+ runner.enqueue("", [ "schema.name": "sample"])
+ runner.run()
+
+ runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(GetMongoRecord.REL_ORIGINAL, 1)
+
+ ffValidator(runner)
+
+ runner.clearTransferState()
+ runner.removeProperty(GetMongoRecord.QUERY)
+ runner.enqueue("{}", [ "schema.name": "sample"])
+ runner.run()
+
+ runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(GetMongoRecord.REL_ORIGINAL, 1)
+
+ ffValidator(runner)
+ }
+
+ @Test
+ void testSortAndProjection() {
+ runner.setIncomingConnection(false)
+ runner.setVariable("schema.name", "sample")
+ runner.setProperty(GetMongoRecord.SORT, toJson([failedLogins: 1]))
+ runner.setProperty(GetMongoRecord.PROJECTION, toJson([failedLogins: 1]))
+ runner.setProperty(GetMongoRecord.QUERY, "{}")
+ runner.run()
+
+ def parsed = sharedTest()
+ Assert.assertEquals(3, parsed.size())
+ def values = [1, 2, 4]
+ int index = 0
+ parsed.each {
+ Assert.assertEquals(values[index++], it["failedLogins"])
+ Assert.assertNull(it["name"])
+ Assert.assertNull(it["lastLogin"])
+ }
+ }
+
+ List<Map<String, Object>> sharedTest() {
+ runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
+
+ def ff = runner.getFlowFilesForRelationship(GetMongoRecord.REL_SUCCESS)[0]
+ def raw = runner.getContentAsByteArray(ff)
+ String content = new String(raw)
+ def parsed = new JsonSlurper().parseText(content)
+ Assert.assertNotNull(parsed)
+
+ parsed
+ }
+
+ @Test
+ void testLimit() {
+ runner.setIncomingConnection(false)
+ runner.setProperty(GetMongoRecord.LIMIT, "1")
+ runner.setProperty(GetMongoRecord.QUERY, "{}")
+ runner.setVariable("schema.name", "sample")
+ runner.run()
+
+ def parsed = sharedTest()
+ Assert.assertEquals(1, parsed.size())
+
+ }
+}