You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/07/03 02:03:50 UTC

nifi git commit: NIFI-5059 Updated MongoDBLookupService to be able to detect record schemas or take one provided by the user.

Repository: nifi
Updated Branches:
  refs/heads/master d4d4ddade -> 22ec069ac


NIFI-5059 Updated MongoDBLookupService to be able to detect record schemas or take one provided by the user.

NIFI-5059 Changed it to use a schema registry.

NIFI-5059 Updated MongoDBLookupService to be a SchemaRegistryService.

NIFI-5059 Added two changes from a code review.

NIFI-5059 Fixed two bad references.

NIFI-5059 Refactored schema strategy handling.

NIFI-5059 Moved schema strategy handling to JsonInferenceSchemaRegistryService.

NIFI-5059 Updated to use new LookupService method.

NIFI-5059 fixed schema inference bug.

NIFI-5059 Added test for schema text strategy

NIFI-5059 incremented version number to make the build work.

NIFI-5059 fixed a stray 1.7.0 reference.

NIFI-5059 Added getDatabase to client service.

NIFI-5059 Added changes requested in a code review.

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2619


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/22ec069a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/22ec069a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/22ec069a

Branch: refs/heads/master
Commit: 22ec069acb39d59388d4adc4965929cfdba8f36b
Parents: d4d4dda
Author: Mike Thomsen <mi...@gmail.com>
Authored: Mon Apr 9 07:28:40 2018 -0400
Committer: Matthew Burgess <ma...@apache.org>
Committed: Mon Jul 2 21:57:50 2018 -0400

----------------------------------------------------------------------
 .../nifi-avro-record-utils/pom.xml              |   6 +
 .../schema/access/InferenceSchemaStrategy.java  |  87 +++++++++++
 .../nifi/schema/access/SchemaAccessUtils.java   |   2 +-
 .../JsonInferenceSchemaRegistryService.java     | 108 ++++++++++++++
 .../serialization/SchemaRegistryService.java    |  16 +-
 .../nifi/serialization/FakeProcessor.groovy     |  44 ++++++
 ...estJsonInferenceSchemaRegistryService.groovy |  59 ++++++++
 .../schema/access/JsonSchemaAccessStrategy.java |  36 +++++
 .../nifi/mongodb/MongoDBClientService.java      |   4 +
 .../nifi-mongodb-services/pom.xml               |  12 ++
 .../nifi/mongodb/MongoDBControllerService.java  |  16 ++
 .../nifi/mongodb/MongoDBLookupService.java      | 132 ++++++++++++-----
 .../nifi/mongodb/MongoDBLookupServiceIT.java    | 148 ++++++++++++++++---
 .../apache/nifi/mongodb/StubSchemaRegistry.java |  49 ++++++
 .../src/test/resources/simple.avsc              |   7 +
 15 files changed, 663 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
index 864581e..05e62f4 100755
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
@@ -43,6 +43,12 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-record</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.8.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
new file mode 100644
index 0000000..6cfd35d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.codehaus.jackson.map.ObjectMapper;
+import sun.misc.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InferenceSchemaStrategy implements JsonSchemaAccessStrategy {
+    private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
+
+    @Override
+    public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+        byte[] bytes = IOUtils.readFully(contentStream, -1, true);
+        ObjectMapper mapper = new ObjectMapper();
+
+        return convertSchema(mapper.readValue(bytes, Map.class));
+    }
+
+    protected RecordSchema convertSchema(Map<String, Object> result) {
+        List<RecordField> fields = new ArrayList<>();
+        for (Map.Entry<String, Object> entry : result.entrySet()) {
+
+            RecordField field;
+            if (entry.getValue() instanceof Integer) {
+                field = new RecordField(entry.getKey(), RecordFieldType.INT.getDataType());
+            } else if (entry.getValue() instanceof Long) {
+                field = new RecordField(entry.getKey(), RecordFieldType.LONG.getDataType());
+            } else if (entry.getValue() instanceof Boolean) {
+                field = new RecordField(entry.getKey(), RecordFieldType.BOOLEAN.getDataType());
+            } else if (entry.getValue() instanceof Double) {
+                field = new RecordField(entry.getKey(), RecordFieldType.DOUBLE.getDataType());
+            } else if (entry.getValue() instanceof Date) {
+                field = new RecordField(entry.getKey(), RecordFieldType.DATE.getDataType());
+            } else if (entry.getValue() instanceof List) {
+                field = new RecordField(entry.getKey(), RecordFieldType.ARRAY.getDataType());
+            } else if (entry.getValue() instanceof Map) {
+                RecordSchema nestedSchema = convertSchema((Map)entry.getValue());
+                RecordDataType rdt = new RecordDataType(nestedSchema);
+                field = new RecordField(entry.getKey(), rdt);
+            } else {
+                field = new RecordField(entry.getKey(), RecordFieldType.STRING.getDataType());
+            }
+            fields.add(field);
+        }
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
+    }
+
+    @Override
+    public RecordSchema getSchema(Map<String, String> variables, Map<String, Object> content, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+        return convertSchema(content);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
index 82ea240..7921dff 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
@@ -50,7 +50,7 @@ public class SchemaAccessUtils {
         "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single "
             + "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. "
             + "This is based on version 3.2.x of the Confluent Schema Registry.");
-
+    public static final AllowableValue INFER_SCHEMA = new AllowableValue("infer", "Infer from Result");
 
     public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
             .name("schema-registry")

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java
new file mode 100644
index 0000000..b3819cf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.schema.access.AvroSchemaTextStrategy;
+import org.apache.nifi.schema.access.InferenceSchemaStrategy;
+import org.apache.nifi.schema.access.JsonSchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaNamePropertyStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.schema.access.SchemaAccessUtils.INFER_SCHEMA;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
+
+public class JsonInferenceSchemaRegistryService extends SchemaRegistryService {
+    private String schemaAccess;
+
+    @OnEnabled
+    public void onEnabled(ConfigurationContext context) {
+        this.storeSchemaAccessStrategy(context);
+        this.schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
+    }
+
+    @Override
+    protected AllowableValue getDefaultSchemaAccessStrategy() {
+        return INFER_SCHEMA;
+    }
+
+    @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
+        if (strategy == null) {
+            return null;
+        }
+
+        if (strategy.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
+            final PropertyValue schemaName = context.getProperty(SCHEMA_NAME);
+            final PropertyValue schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME);
+            final PropertyValue schemaVersion = context.getProperty(SCHEMA_VERSION);
+            return new SchemaNamePropertyStrategy(schemaRegistry, schemaName, schemaBranchName, schemaVersion);
+        } else if (strategy.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
+            return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
+        } else if (strategy.equalsIgnoreCase(INFER_SCHEMA.getValue())) {
+            return new InferenceSchemaStrategy();
+        }
+
+        return null;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(2);
+
+        final AllowableValue[] strategies = new AllowableValue[] {
+            SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, INFER_SCHEMA
+        };
+
+        properties.add(buildStrategyProperty(strategies));
+
+        properties.add(SCHEMA_REGISTRY);
+        properties.add(SCHEMA_NAME);
+        properties.add(SCHEMA_VERSION);
+        properties.add(SCHEMA_BRANCH_NAME);
+        properties.add(SCHEMA_TEXT);
+
+        return properties;
+    }
+
+    public RecordSchema getSchema(Map<String, String> variables, Map<String, Object> content, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+        if (schemaAccess.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue()) || schemaAccess.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
+            return getSchema(variables, readSchema);
+        } else {
+            return ((JsonSchemaAccessStrategy)schemaAccessStrategy).getSchema(variables, content, readSchema);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
index b299191..6923f48 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -58,7 +58,7 @@ import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_
 public abstract class SchemaRegistryService extends AbstractControllerService {
 
     private volatile ConfigurationContext configurationContext;
-    private volatile SchemaAccessStrategy schemaAccessStrategy;
+    protected volatile SchemaAccessStrategy schemaAccessStrategy;
     private static InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
 
     private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
@@ -68,16 +68,20 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
         return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
     }
 
+    protected PropertyDescriptor buildStrategyProperty(AllowableValue[] values) {
+        return new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
+            .allowableValues(values)
+            .defaultValue(getDefaultSchemaAccessStrategy().getValue())
+            .build();
+    }
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>(2);
 
         final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
-        properties.add(new PropertyDescriptor.Builder()
-            .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
-            .allowableValues(strategies)
-            .defaultValue(getDefaultSchemaAccessStrategy().getValue())
-            .build());
+        properties.add(buildStrategyProperty(strategies));
 
         properties.add(SCHEMA_REGISTRY);
         properties.add(SCHEMA_NAME);

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy
new file mode 100644
index 0000000..6cf32b7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization
+
+import org.apache.nifi.components.PropertyDescriptor
+import org.apache.nifi.processor.AbstractProcessor
+import org.apache.nifi.processor.ProcessContext
+import org.apache.nifi.processor.ProcessSession
+import org.apache.nifi.processor.exception.ProcessException
+
+class FakeProcessor extends AbstractProcessor {
+    static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+        .name("Schema Service")
+        .description("")
+        .identifiesControllerService(JsonInferenceSchemaRegistryService.class)
+        .required(true)
+        .build()
+
+    @Override
+    void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return [
+            CLIENT_SERVICE
+        ]
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
new file mode 100644
index 0000000..c930452
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization
+
+import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.serialization.record.type.RecordDataType
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Test
+
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY
+
+class TestJsonInferenceSchemaRegistryService {
+    @Test
+    void testInfer() {
+        def runner = TestRunners.newTestRunner(FakeProcessor.class)
+        def service = new JsonInferenceSchemaRegistryService()
+        runner.addControllerService("schemaService", service)
+        runner.setProperty(FakeProcessor.CLIENT_SERVICE, "schemaService")
+        runner.setProperty(service, service.getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()), SchemaAccessUtils.INFER_SCHEMA)
+        runner.enableControllerService(service)
+        runner.assertValid()
+
+        def json = [
+            name: "John Smith",
+            age: 35,
+            contact: [
+                email: "john.smith@example.com",
+                phone: "123-456-7890"
+            ]
+        ]
+
+        def schema = service.getSchema([:], json, null)
+
+        Assert.assertNotNull(schema)
+        def name = schema.getField("name")
+        def age  = schema.getField("age")
+        def contact = schema.getField("contact")
+        Assert.assertTrue(name.isPresent())
+        Assert.assertTrue(age.isPresent())
+        Assert.assertTrue(contact.isPresent())
+        Assert.assertTrue(contact.get().dataType instanceof RecordDataType)
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java
new file mode 100644
index 0000000..30dcfe5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface JsonSchemaAccessStrategy extends SchemaAccessStrategy {
+    /**
+     * Get a schema using a Map object instead of an input stream. This is meant to be used with JSON toolkits.
+     *
+     * @param variables Variables which is used to resolve Record Schema via Expression Language.
+     *                  This can be null or empty.
+     * @param content   JSON content in a Map object form.
+     * @param readSchema  The schema that was read from the input content, or <code>null</code> if there was none.
+     * @return The RecordSchema if found.
+     */
+    RecordSchema getSchema(Map<String, String> variables, Map<String, Object> content, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
index b0f1618..5a3a4b2 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.mongodb;
 
+import com.mongodb.client.MongoDatabase;
 import org.apache.nifi.controller.ControllerService;
 import org.bson.Document;
 
@@ -31,6 +32,7 @@ public interface MongoDBClientService extends ControllerService {
     void delete(Document query);
     boolean exists(Document query);
     Document findOne(Document query);
+    Document findOne(Document query, Document projection);
     List<Document> findMany(Document query);
     List<Document> findMany(Document query, int limit);
     List<Document> findMany(Document query, Document sort, int limit);
@@ -42,4 +44,6 @@ public interface MongoDBClientService extends ControllerService {
     void upsert(Document query, Document update);
     void dropDatabase();
     void dropCollection();
+
+    MongoDatabase getDatabase(String name);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml
index c595508..4b54073 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml
@@ -78,6 +78,17 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.8.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
 
     </dependencies>
 
@@ -88,6 +99,7 @@
                 <artifactId>apache-rat-plugin</artifactId>
                 <configuration>
                     <excludes combine.children="append">
+                        <exclude>src/test/resources/simple.avsc</exclude>
                         <exclude>src/test/resources/test.csv</exclude>
                         <exclude>src/test/resources/test.properties</exclude>
                         <exclude>src/test/resources/test.xml</exclude>

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
index 0faed0d..a1f9b2b 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
@@ -80,6 +80,17 @@ public class MongoDBControllerService extends AbstractMongoDBControllerService i
     }
 
     @Override
+    public Document findOne(Document query, Document projection) {
+        MongoCursor<Document> cursor  = projection != null
+                ? this.col.find(query).projection(projection).limit(1).iterator()
+                : this.col.find(query).limit(1).iterator();
+        Document retVal = cursor.tryNext();
+        cursor.close();
+
+        return retVal;
+    }
+
+    @Override
     public List<Document> findMany(Document query) {
         return findMany(query, null, -1);
     }
@@ -153,4 +164,9 @@ public class MongoDBControllerService extends AbstractMongoDBControllerService i
         this.col.drop();
         this.col = null;
     }
+
+    @Override
+    public MongoDatabase getDatabase(String name) {
+        return mongoClient.getDatabase(name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
index fba2287..6c4905a 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
@@ -20,22 +20,20 @@ package org.apache.nifi.mongodb;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.lookup.LookupFailureException;
 import org.apache.nifi.lookup.LookupService;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.StringUtils;
 import org.bson.Document;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,6 +41,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.schema.access.SchemaAccessUtils.INFER_SCHEMA;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
 
 @Tags({"mongo", "mongodb", "lookup", "record"})
 @CapabilityDescription(
@@ -52,31 +61,49 @@ import java.util.Set;
     "The query is limited to the first result (findOne in the Mongo documentation). If no \"Lookup Value Field\" is specified " +
     "then the entire MongoDB result document minus the _id field will be returned as a record."
 )
-public class MongoDBLookupService extends MongoDBControllerService implements LookupService<Object> {
+public class MongoDBLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Object> {
+    public static final PropertyDescriptor CONTROLLER_SERVICE = new PropertyDescriptor.Builder()
+        .name("mongo-lookup-client-service")
+        .displayName("Client Service")
+        .description("A MongoDB controller service to use with this lookup service.")
+        .required(true)
+        .identifiesControllerService(MongoDBClientService.class)
+        .build();
 
     public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder()
-            .name("mongo-lookup-value-field")
-            .displayName("Lookup Value Field")
-            .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " +
-                    "MongoDB result document minus the _id field will be returned as a record.")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(false)
-            .build();
+        .name("mongo-lookup-value-field")
+        .displayName("Lookup Value Field")
+        .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " +
+                "MongoDB result document minus the _id field will be returned as a record.")
+        .addValidator(Validator.VALID)
+        .required(false)
+        .build();
+    public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
+        .name("mongo-lookup-projection")
+        .displayName("Projection")
+        .description("Specifies a projection for limiting which fields will be returned.")
+        .required(false)
+        .build();
 
     private String lookupValueField;
 
-    private static final List<PropertyDescriptor> lookupDescriptors;
-
-    static {
-        lookupDescriptors = new ArrayList<>();
-        lookupDescriptors.addAll(descriptors);
-        lookupDescriptors.add(LOOKUP_VALUE_FIELD);
+    @Override
+    public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+        /*
+         * Unless the user hard-coded schema.name or schema.text into the schema access options, this is going
+         * to force schema detection.
+         */
+        return lookup(coordinates, new HashMap<>());
     }
 
     @Override
-    public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException {
-        Map<String, Object> clean = new HashMap<>();
-        clean.putAll(coordinates);
+    public Optional<Object> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
+        Map<String, Object> clean = coordinates.entrySet().stream()
+            .filter(e -> !schemaNameProperty.equals(String.format("${%s}", e.getKey())))
+            .collect(Collectors.toMap(
+                e -> e.getKey(),
+                e -> e.getValue()
+            ));
         Document query = new Document(clean);
 
         if (coordinates.size() == 0) {
@@ -84,23 +111,15 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo
         }
 
         try {
-            Document result = this.findOne(query);
+            Document result = projection != null ? controllerService.findOne(query, projection) : controllerService.findOne(query);
 
             if(result == null) {
                 return Optional.empty();
             } else if (!StringUtils.isEmpty(lookupValueField)) {
                 return Optional.ofNullable(result.get(lookupValueField));
             } else {
-                final List<RecordField> fields = new ArrayList<>();
+                RecordSchema schema = loadSchema(context, result);
 
-                for (String key : result.keySet()) {
-                    if (key.equals("_id")) {
-                        continue;
-                    }
-                    fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
-                }
-
-                final RecordSchema schema = new SimpleRecordSchema(fields);
                 return Optional.ofNullable(new MapRecord(schema, result));
             }
         } catch (Exception ex) {
@@ -109,10 +128,32 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo
         }
     }
 
-    @Override
+    private RecordSchema loadSchema(Map<String, String> context, Document doc) {
+        try {
+            return getSchema(context, doc, null);
+        } catch (Exception ex) {
+            return null;
+        }
+    }
+
+    private volatile Document projection;
+    private MongoDBClientService controllerService;
+    private String schemaNameProperty;
+
     @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
+    public void onEnabled(final ConfigurationContext context) {
         this.lookupValueField = context.getProperty(LOOKUP_VALUE_FIELD).getValue();
+        this.controllerService = context.getProperty(CONTROLLER_SERVICE).asControllerService(MongoDBClientService.class);
+
+        this.schemaNameProperty = context.getProperty(SchemaAccessUtils.SCHEMA_NAME).getValue();
+
+        String configuredProjection = context.getProperty(PROJECTION).isSet()
+            ? context.getProperty(PROJECTION).getValue()
+            : null;
+        if (!StringUtils.isBlank(configuredProjection)) {
+            projection = Document.parse(configuredProjection);
+        }
+
         super.onEnabled(context);
     }
 
@@ -128,6 +169,25 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return lookupDescriptors;
+        AllowableValue[] strategies = new AllowableValue[] {
+            SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, INFER_SCHEMA
+        };
+        List<PropertyDescriptor> _temp = new ArrayList<>();
+        _temp.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
+                .allowableValues(strategies)
+                .defaultValue(getDefaultSchemaAccessStrategy().getValue())
+                .build());
+
+        _temp.add(SCHEMA_REGISTRY);
+        _temp.add(SCHEMA_NAME);
+        _temp.add(SCHEMA_VERSION);
+        _temp.add(SCHEMA_BRANCH_NAME);
+        _temp.add(SCHEMA_TEXT);
+        _temp.add(CONTROLLER_SERVICE);
+        _temp.add(LOOKUP_VALUE_FIELD);
+        _temp.add(PROJECTION);
+
+        return Collections.unmodifiableList(_temp);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
index dcb3fb1..2c7f522 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
@@ -17,8 +17,14 @@
 
 package org.apache.nifi.mongodb;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.bson.Document;
@@ -27,8 +33,12 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.sql.Timestamp;
+import java.util.Arrays;
 import java.util.Calendar;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -38,36 +48,49 @@ public class MongoDBLookupServiceIT {
 
     private TestRunner runner;
     private MongoDBLookupService service;
+    private MongoDBControllerService controllerService;
 
     @Before
     public void before() throws Exception {
         runner = TestRunners.newTestRunner(TestLookupServiceProcessor.class);
         service = new MongoDBLookupService();
+        controllerService = new MongoDBControllerService();
         runner.addControllerService("Client Service", service);
-        runner.setProperty(service, MongoDBLookupService.DATABASE_NAME, DB_NAME);
-        runner.setProperty(service, MongoDBLookupService.COLLECTION_NAME, COL_NAME);
-        runner.setProperty(service, MongoDBLookupService.URI, "mongodb://localhost:27017");
+        runner.addControllerService("Client Service 2", controllerService);
+        runner.setProperty(TestLookupServiceProcessor.CLIENT_SERVICE, "Client Service");
+        runner.setProperty(controllerService, MongoDBControllerService.DATABASE_NAME, DB_NAME);
+        runner.setProperty(controllerService, MongoDBControllerService.COLLECTION_NAME, COL_NAME);
+        runner.setProperty(controllerService, MongoDBControllerService.URI, "mongodb://localhost:27017");
         runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
+        runner.setProperty(service, MongoDBLookupService.CONTROLLER_SERVICE, "Client Service 2");
+        SchemaRegistry registry = new StubSchemaRegistry();
+        runner.addControllerService("registry", registry);
+        runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "");
+        runner.setProperty(service, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+        runner.enableControllerService(registry);
+        runner.enableControllerService(controllerService);
+        runner.enableControllerService(service);
     }
 
     @After
-    public void after() throws Exception {
-        service.dropDatabase();
-        service.onDisable();
+    public void after() {
+        controllerService.dropDatabase();
+        controllerService.onDisable();
     }
 
     @Test
-    public void testInit() throws Exception {
-        runner.enableControllerService(service);
+    public void testInit() {
         runner.assertValid(service);
+
     }
 
     @Test
     public void testLookupSingle() throws Exception {
+        runner.disableControllerService(service);
         runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
         runner.enableControllerService(service);
-        Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
-        service.insert(document);
+        Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
+        controllerService.insert(document);
 
         Map<String, Object> criteria = new HashMap<>();
         criteria.put("uuid", "x-y-z");
@@ -78,7 +101,7 @@ public class MongoDBLookupServiceIT {
 
         Map<String, Object> clean = new HashMap<>();
         clean.putAll(criteria);
-        service.delete(new Document(clean));
+        controllerService.delete(new Document(clean));
 
         try {
             result = service.lookup(criteria);
@@ -90,11 +113,85 @@ public class MongoDBLookupServiceIT {
     }
 
     @Test
+    public void testWithSchemaRegistry() throws Exception {
+        runner.assertValid();
+
+        controllerService.insert(new Document()
+            .append("username", "john.smith")
+            .append("password", "testing1234")
+        );
+
+        Map<String, Object> criteria = new HashMap<>();
+        criteria.put("username", "john.smith");
+        Map<String, String> context = new HashMap<>();
+        context.put("schema.name", "user");
+        Optional result = service.lookup(criteria, context);
+        Assert.assertTrue(result.isPresent());
+        Assert.assertNotNull(result.get());
+        MapRecord record = (MapRecord)result.get();
+
+        Assert.assertEquals("john.smith", record.getAsString("username"));
+        Assert.assertEquals("testing1234", record.getAsString("password"));
+
+        /*
+         * Test falling back on schema detection if a user doesn't specify the context argument
+         */
+        result = service.lookup(criteria);
+        Assert.assertTrue(result.isPresent());
+        Assert.assertNotNull(result.get());
+        record = (MapRecord)result.get();
+
+        Assert.assertEquals("john.smith", record.getAsString("username"));
+        Assert.assertEquals("testing1234", record.getAsString("password"));
+    }
+
+    @Test
+    public void testSchemaTextStrategy() throws Exception {
+        byte[] contents = IOUtils.toByteArray(getClass().getResourceAsStream("/simple.avsc"));
+
+        runner.disableControllerService(service);
+        runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "");
+        runner.setProperty(service, MongoDBLookupService.PROJECTION, "{ \"_id\": 0 }");
+        runner.setProperty(service, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(service, SchemaAccessUtils.SCHEMA_TEXT, "${schema.text}");
+        runner.enableControllerService(service);
+        runner.assertValid();
+
+        controllerService.insert(new Document().append("msg", "Testing1234"));
+
+        Map<String, Object> criteria = new HashMap<>();
+        criteria.put("msg", "Testing1234");
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put("schema.text", new String(contents));
+
+        Optional results = service.lookup(criteria, attrs);
+        Assert.assertNotNull(results);
+        Assert.assertTrue(results.isPresent());
+    }
+
+    @Test
     public void testLookupRecord() throws Exception {
+        runner.disableControllerService(service);
         runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "");
+        runner.setProperty(service, MongoDBLookupService.PROJECTION, "{ \"_id\": 0 }");
         runner.enableControllerService(service);
-        Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
-        service.insert(document);
+
+        Date d = new Date();
+        Timestamp ts = new Timestamp(new Date().getTime());
+        List list = Arrays.asList("a", "b", "c", "d", "e");
+
+        controllerService.insert(new Document()
+            .append("uuid", "x-y-z")
+            .append("dateField", d)
+            .append("longField", 10000L)
+            .append("stringField", "Hello, world")
+            .append("timestampField", ts)
+            .append("decimalField", Double.MAX_VALUE / 2.0)
+            .append("subrecordField", new Document()
+                .append("nestedString", "test")
+                .append("nestedLong", new Long(1000)))
+            .append("arrayField", list)
+        );
 
         Map<String, Object> criteria = new HashMap<>();
         criteria.put("uuid", "x-y-z");
@@ -103,12 +200,24 @@ public class MongoDBLookupServiceIT {
         Assert.assertNotNull("The value was null.", result.get());
         Assert.assertTrue("The value was wrong.", result.get() instanceof MapRecord);
         MapRecord record = (MapRecord)result.get();
-        Assert.assertEquals("The value was wrong.", "Hello, world", record.getAsString("message"));
-        Assert.assertEquals("The value was wrong.", "x-y-z", record.getAsString("uuid"));
+        RecordSchema subSchema = ((RecordDataType)record.getSchema().getField("subrecordField").get().getDataType()).getChildSchema();
+
+        Assert.assertEquals("The value was wrong.", "Hello, world", record.getValue("stringField"));
+        Assert.assertEquals("The value was wrong.", "x-y-z", record.getValue("uuid"));
+        Assert.assertEquals(new Long(10000), record.getValue("longField"));
+        Assert.assertEquals((Double.MAX_VALUE / 2.0), record.getValue("decimalField"));
+        Assert.assertEquals(d, record.getValue("dateField"));
+        Assert.assertEquals(ts.getTime(), ((Date)record.getValue("timestampField")).getTime());
+
+        Record subRecord = record.getAsRecord("subrecordField", subSchema);
+        Assert.assertNotNull(subRecord);
+        Assert.assertEquals("test", subRecord.getValue("nestedString"));
+        Assert.assertEquals(new Long(1000), subRecord.getValue("nestedLong"));
+        Assert.assertEquals(list, record.getValue("arrayField"));
 
         Map<String, Object> clean = new HashMap<>();
         clean.putAll(criteria);
-        service.delete(new Document(clean));
+        controllerService.delete(new Document(clean));
 
         try {
             result = service.lookup(criteria);
@@ -120,10 +229,9 @@ public class MongoDBLookupServiceIT {
     }
 
     @Test
-    public void testServiceParameters() throws Exception {
-        runner.enableControllerService(service);
-        Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
-        service.insert(document);
+    public void testServiceParameters() {
+        Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
+        controllerService.insert(document);
 
         Map<String, Object> criteria = new HashMap<>();
         criteria.put("uuid", "x-y-z");

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java
new file mode 100644
index 0000000..5e17258
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.mongodb;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class StubSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
+    @Override
+    public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) {
+        List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("username", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("password", RecordFieldType.STRING.getDataType()));
+        return new SimpleRecordSchema(fields);
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return new HashSet<SchemaField>() {{
+            add(SchemaField.SCHEMA_NAME);
+        }};
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc
new file mode 100644
index 0000000..2bea9cb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc
@@ -0,0 +1,7 @@
+{
+    "type": "record",
+    "name": "SimpleRecord",
+    "fields": [
+        { "name": "msg", "type": "string" }
+    ]
+}
\ No newline at end of file