You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2020/12/01 16:15:14 UTC

[nifi] branch main updated: NIFI-7906: parameterized graph query NIFI-7906: addressing PR concerns NIFI-7906: code styling fixes NIFI-7906: adding in license information to new files + enables processor in META-INF NIFI-7906: exclude test files from RAT NIFI-7906: PR refactor to streamline graph response NIFI-7906: removing ERRORS output Unused after refactor Did a few cleanups for the contributor.

This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c29cced  NIFI-7906: parameterized graph query NIFI-7906: addressing PR concerns NIFI-7906: code styling fixes NIFI-7906: adding in license information to new files + enables processor in META-INF NIFI-7906: exclude test files from RAT NIFI-7906: PR refactor to streamline graph response NIFI-7906: removing ERRORS output Unused after refactor Did a few cleanups for the contributor.
c29cced is described below

commit c29cced269dcce28fb9ba034025d01e76a79b037
Author: Levi Lentz <le...@gmail.com>
AuthorDate: Thu Oct 22 19:00:34 2020 -0400

    NIFI-7906: parameterized graph query
    NIFI-7906: addressing PR concerns
    NIFI-7906: code styling fixes
    NIFI-7906: adding in license information to new files
    + enables processor in META-INF
    NIFI-7906: exclude test files from RAT
    NIFI-7906: PR refactor to streamline graph response
    NIFI-7906: removing ERRORS output
    Unused after refactor
    Did a few cleanups for the contributor.
    
    This closes #4638
    
    Signed-off-by: Mike Thomsen <mt...@apache.org>
---
 .../nifi-graph-processors/pom.xml                  |  55 +++++
 .../processors/graph/ExecuteGraphQueryRecord.java  | 270 +++++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../graph/ExecuteGraphQueryRecordTest.java         | 176 ++++++++++++++
 .../processors/graph/util/InMemoryGraphClient.java | 113 +++++++++
 .../src/test/resources/testAttributes.json         |   3 +
 .../src/test/resources/testComplexFlowFile.json    |   9 +
 .../src/test/resources/testFlowFileContent.json    |   3 +
 .../src/test/resources/testFlowFileList.json       |   3 +
 9 files changed, 633 insertions(+)

diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
index 9b68f45..1a9b6f6 100644
--- a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
@@ -78,5 +78,60 @@
             <version>1.13.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/testAttributes.json</exclude>
+                        <exclude>src/test/resources/testComplexFlowFile.json</exclude>
+                        <exclude>src/test/resources/testFlowFileContent.json</exclude>
+                        <exclude>src/test/resources/testFlowFileList.json</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
new file mode 100644
index 0000000..e750b6c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+@Tags({"graph, gremlin"})
+@CapabilityDescription("This uses a flowfile as input to perform graph mutations.")
+@WritesAttributes({
+        @WritesAttribute(attribute = ExecuteGraphQueryRecord.GRAPH_OPERATION_TIME, description = "The amount of time it took to execute all of the graph operations."),
+        @WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT, description = "The amount of record processed")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "A dynamic property to be used as a parameter in the graph script",
+        value = "The variable name to be set", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Uses a record path to set a variable as a parameter in the graph script")
+public class ExecuteGraphQueryRecord extends  AbstractGraphExecutor {
+
+    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+            .name("client-service")
+            .displayName("Client Service")
+            .description("The graph client service for connecting to a graph database.")
+            .identifiesControllerService(GraphClientService.class)
+            .addValidator(Validator.VALID)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor READER_SERVICE = new PropertyDescriptor.Builder()
+            .name("reader-service")
+            .displayName("Record Reader")
+            .description("The record reader to use with this processor.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor WRITER_SERVICE = new PropertyDescriptor.Builder()
+            .name("writer-service")
+            .displayName("Failed Record Writer")
+            .description("The record writer to use for writing failed records.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor SUBMISSION_SCRIPT = new PropertyDescriptor.Builder()
+            .name("record-script")
+            .displayName("Graph Record Script")
+            .description("Script to perform the business logic on graph, using flow file attributes and custom properties " +
+                    "as variable-value pairs in its logic.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+    }
+
+    public static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+            CLIENT_SERVICE, READER_SERVICE, WRITER_SERVICE, SUBMISSION_SCRIPT
+    ));
+
+    public static final Relationship SUCCESS = new Relationship.Builder().name("original")
+                                                    .description("Original flow files that successfully interacted with " +
+                                                            "graph server.")
+                                                    .build();
+    public static final Relationship FAILURE = new Relationship.Builder().name("failure")
+                                                    .description("Flow files that fail to interact with graph server.")
+                                                    .build();
+    public static final Relationship GRAPH = new Relationship.Builder().name("response")
+                                                    .description("The response object from the graph server.")
+                                                    .autoTerminateDefault(true)
+                                                    .build();
+
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            SUCCESS, FAILURE, GRAPH
+    )));
+
+    public static final String RECORD_COUNT = "records.count";
+    public static final String GRAPH_OPERATION_TIME = "graph.operations.took";
+    private volatile RecordPathCache recordPathCache;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    private GraphClientService clientService;
+    private RecordReaderFactory recordReaderFactory;
+    private RecordSetWriterFactory recordSetWriterFactory;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
+        recordReaderFactory = context.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class);
+        recordSetWriterFactory = context.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class);
+        recordPathCache = new RecordPathCache(100);
+    }
+
+    private List<Object> getRecordValue(Record record, RecordPath recordPath){
+        final RecordPathResult result = recordPath.evaluate(record);
+        return result.getSelectedFields()
+                .filter(fv -> fv.getValue() != null)
+                .map(FieldValue::getValue)
+                .collect( Collectors.toList());
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile input = session.get();
+        if ( input == null ) {
+            return;
+        }
+
+        List<FlowFile> graphList = new ArrayList<>();
+
+        String recordScript = context.getProperty(SUBMISSION_SCRIPT)
+                .evaluateAttributeExpressions(input)
+                .getValue();
+
+        Map<String, RecordPath> dynamic = new HashMap<>();
+
+        FlowFile finalInput = input;
+        context.getProperties()
+                .keySet().stream()
+                .filter(PropertyDescriptor::isDynamic)
+                .forEach(it ->
+                    dynamic.put(it.getName(), recordPathCache.getCompiled(
+                                    context
+                                    .getProperty(it.getName())
+                                    .evaluateAttributeExpressions(finalInput)
+                                    .getValue()))
+                );
+
+
+        boolean failed = false;
+        long delta = 0;
+        try (InputStream is = session.read(input);
+             RecordReader reader = recordReaderFactory.createRecordReader(input, is, getLogger());
+        ) {
+            Record record;
+
+            long start = System.currentTimeMillis();
+            while ((record = reader.nextRecord()) != null) {
+                FlowFile graph = session.create(input);
+
+                List<Map<String,Object>> graphResponses = new ArrayList<>();
+
+                Map<String, Object> dynamicPropertyMap = new HashMap<>();
+                for (String entry : dynamic.keySet()) {
+                        if(!dynamicPropertyMap.containsKey(entry)) {
+                            dynamicPropertyMap.put(entry, getRecordValue(record, dynamic.get(entry)));
+                        }
+                }
+
+                dynamicPropertyMap.putAll(input.getAttributes());
+                graphResponses.addAll(executeQuery(recordScript, dynamicPropertyMap));
+
+                OutputStream graphOutputStream = session.write(graph);
+                String graphOutput = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
+                graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
+                graphList.add(graph);
+                graphOutputStream.close();
+            }
+            long end = System.currentTimeMillis();
+            delta = (end - start) / 1000;
+            if (getLogger().isDebugEnabled()){
+                getLogger().debug(String.format("Took %s seconds.", delta));
+            }
+        } catch (Exception ex) {
+            getLogger().error("", ex);
+            failed = true;
+        } finally {
+            if (failed) {
+                graphList.forEach(session::remove);
+                session.transfer(input, FAILURE);
+            } else {
+                input = session.putAttribute(input, GRAPH_OPERATION_TIME, String.valueOf(delta));
+                session.getProvenanceReporter().send(input, clientService.getTransitUrl(), delta*1000);
+                session.transfer(input, SUCCESS);
+                graphList.forEach(it -> {
+                   session.transfer(it, GRAPH);
+                });
+            }
+        }
+    }
+
+    private List<Map<String, Object>> executeQuery(String recordScript, Map<String, Object> parameters) {
+        ObjectMapper mapper = new ObjectMapper();
+        List<Map<String, Object>> graphResponses = new ArrayList<>();
+        clientService.executeQuery(recordScript, parameters, (map, b) -> {
+            if (getLogger().isDebugEnabled()){
+                try {
+                    getLogger().debug(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map));
+                } catch (JsonProcessingException ex) {
+                    getLogger().error("Error converted map to JSON ", ex);
+                }
+            }
+            graphResponses.add(map);
+        });
+        return graphResponses;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 615c785..2ab7e95 100644
--- a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,3 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.processors.graph.ExecuteGraphQuery
+org.apache.nifi.processors.graph.ExecuteGraphQueryRecord
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
new file mode 100644
index 0000000..8c6cb96
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import groovy.json.JsonOutput;
+import org.apache.nifi.processors.graph.util.InMemoryGraphClient;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+
+public class ExecuteGraphQueryRecordTest {
+    private TestRunner runner;
+    private JsonTreeReader reader;
+    private InMemoryGraphClient graphClient;
+    Map<String, String> enqueProperties = new HashMap<>();
+
+    @Before
+    public void setup() throws InitializationException {
+        MockRecordWriter writer = new MockRecordWriter();
+        reader = new JsonTreeReader();
+        runner = TestRunners.newTestRunner(ExecuteGraphQueryRecord.class);
+        runner.addControllerService("reader", reader);
+        runner.addControllerService("writer", writer);
+        runner.setProperty(ExecuteGraphQueryRecord.READER_SERVICE, "reader");
+        runner.setProperty(ExecuteGraphQueryRecord.WRITER_SERVICE, "writer");
+
+        runner.enableControllerService(writer);
+        runner.enableControllerService(reader);
+
+        graphClient = new InMemoryGraphClient();
+
+
+        runner.addControllerService("graphClient", graphClient);
+
+        runner.setProperty(ExecuteGraphQueryRecord.CLIENT_SERVICE, "graphClient");
+        runner.enableControllerService(graphClient);
+        runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, "[ 'testProperty': 'testResponse' ]");
+        runner.assertValid();
+        enqueProperties.put("graph.name", "graph");
+
+    }
+
+    @Test
+    public void testFlowFileContent() throws IOException {
+        List<Map> test = new ArrayList<>();
+        Map<String, Object> tempMap = new HashMap<>();
+        tempMap.put("M", 1);
+        test.add(tempMap);
+
+        byte[] json = JsonOutput.toJson(test).getBytes();
+        String submissionScript;
+        submissionScript = "[ 'M': M[0] ]";
+
+        runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
+        runner.setProperty("M", "/M");
+        runner.enqueue(json, enqueProperties);
+
+        runner.run();
+        runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+        runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+        runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+        MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+        relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testFlowFileContent.json"));
+    }
+
+    @Test
+    public void testFlowFileList() throws IOException {
+        List<Map> test = new ArrayList<>();
+        Map<String, Object> tempMap = new HashMap<>();
+        tempMap.put("M", new ArrayList<Integer>(){
+            {
+                add(1);
+                add(2);
+                add(3);
+            }
+        });
+        test.add(tempMap);
+
+        byte[] json = JsonOutput.toJson(test).getBytes();
+        String submissionScript = "[ " +
+                "'M': M[0] " +
+                "]";
+
+        runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
+        runner.setProperty("M", "/M");
+        runner.enqueue(json, enqueProperties);
+
+        runner.run();
+        runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+        runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+        runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+        MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+        relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testFlowFileList.json"));
+    }
+
+    @Test
+    public void testComplexFlowFile() throws IOException {
+        List<Map> test = new ArrayList<>();
+        Map<String, Object> tempMap = new HashMap<>();
+        tempMap.put("tMap", "123");
+        tempMap.put("L", new ArrayList<Integer>(){
+            {
+                add(1);
+                add(2);
+                add(3);
+            }
+        });
+        test.add(tempMap);
+
+        byte[] json = JsonOutput.toJson(test).getBytes();
+        String submissionScript = "Map<String, Object> vertexHashes = new HashMap()\n" +
+                "vertexHashes.put('1234', tMap[0])\n" +
+                "[ 'L': L[0], 'result': vertexHashes ]";
+        runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
+        runner.setProperty("tMap", "/tMap");
+        runner.setProperty("L", "/L");
+        runner.enqueue(json, enqueProperties);
+
+        runner.run();
+        runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+        runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+        runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+        MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+        relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testComplexFlowFile.json"));
+    }
+
+    @Test
+    public void testAttributes() throws IOException {
+        List<Map<String, Object>> test = new ArrayList<>();
+        Map<String, Object> tempMap = new HashMap<>();
+        tempMap.put("tMap", "123");
+        test.add(tempMap);
+
+        byte[] json = JsonOutput.toJson(test).getBytes();
+        String submissionScript = "[ " +
+                "'testProperty': testProperty " +
+                "] ";
+        runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
+        Map<String, String> enqueProperties = new HashMap<>();
+        enqueProperties.put("testProperty", "test");
+        runner.enqueue(json, enqueProperties);
+
+        runner.run();
+        runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+        runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+        runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+        MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+        relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testAttributes.json"));
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
new file mode 100644
index 0000000..a12d5b8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph.util;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.graph.GraphQueryResultCallback;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.janusgraph.core.JanusGraph;
+import org.janusgraph.core.JanusGraphFactory;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.AbstractMap.SimpleEntry;
+
+public class InMemoryGraphClient extends AbstractControllerService implements GraphClientService {
+    private Graph graph;
+
+    @OnEnabled
+    void onEnabled(ConfigurationContext context) {
+        graph = buildGraph();
+    }
+
+    private static JanusGraph buildGraph() {
+        return JanusGraphFactory.build().set("storage.backend", "inmemory").open();
+    }
+
+    @Override
+    public Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback graphQueryResultCallback) {
+        ScriptEngine engine = new ScriptEngineManager().getEngineByName("groovy");
+        parameters.entrySet().stream().forEach( it -> {
+            engine.put(it.getKey(), it.getValue());
+        });
+        if (graph == null) {
+            graph = buildGraph();
+        }
+        engine.put("graph", graph);
+        engine.put("g", graph.traversal());
+
+        Object response;
+        try {
+            response = engine.eval(query);
+        } catch (ScriptException ex) {
+            throw new ProcessException(ex);
+        }
+
+        if (response instanceof Map) {
+            //The below logic helps with the handling of complex Map<String, Object> relationships
+            Map resultMap = (Map) response;
+            if (!resultMap.isEmpty()) {
+                // Convertex a resultMap to an entrySet iterator
+                Iterator outerResultSet = resultMap.entrySet().iterator();
+                // this loops over the outermost map
+                while(outerResultSet.hasNext()) {
+                    Map.Entry<String, Object> innerResultSet = (Map.Entry<String, Object>) outerResultSet.next();
+                    // this is for edge case handling where innerResultSet is also a Map
+                    if (innerResultSet.getValue() instanceof Map) {
+                        Iterator resultSet = ((Map) innerResultSet.getValue()).entrySet().iterator();
+                        // looping over each result in the inner map
+                        while (resultSet.hasNext()) {
+                            Map.Entry<String, Object> tempResult = (Map.Entry<String, Object>) resultSet.next();
+                            Map<String, Object> tempRetObject = new HashMap<>();
+                            tempRetObject.put(tempResult.getKey(), tempResult.getValue());
+                            SimpleEntry returnObject = new SimpleEntry<String, Object>(tempResult.getKey(), tempRetObject);
+                            Map<String, Object> resultReturnMap = new HashMap<>();
+                            resultReturnMap.put(innerResultSet.getKey(), returnObject);
+                            if (getLogger().isDebugEnabled()) {
+                                getLogger().debug(resultReturnMap.toString());
+                            }
+                            // return the object to the graphQueryResultCallback object
+                            graphQueryResultCallback.process(resultReturnMap, resultSet.hasNext());
+                        }
+                    } else {
+                        // for non-maps, return objects need to be a map<string, object> this simply converts the object
+                        // to a map to be return to the graphQueryResultCallback object
+                        Map<String, Object> resultReturnMap = new HashMap<>();
+                        resultReturnMap.put(innerResultSet.getKey(), innerResultSet.getValue());
+                        graphQueryResultCallback.process(resultReturnMap, false);
+                    }
+                }
+
+            }
+        }
+
+        return new HashMap<>();
+    }
+
+    @Override
+    public String getTransitUrl() {
+        return "memory://localhost/graph";
+    }
+}
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testAttributes.json b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testAttributes.json
new file mode 100644
index 0000000..b8084ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testAttributes.json
@@ -0,0 +1,3 @@
+[ {
+  "testProperty" : "test"
+} ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testComplexFlowFile.json b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testComplexFlowFile.json
new file mode 100644
index 0000000..19852fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testComplexFlowFile.json
@@ -0,0 +1,9 @@
+[ {
+  "L" : [ 1, 2, 3 ]
+}, {
+  "result" : {
+    "1234" : {
+      "1234" : "123"
+    }
+  }
+} ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileContent.json b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileContent.json
new file mode 100644
index 0000000..fefbaf3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileContent.json
@@ -0,0 +1,3 @@
+[ {
+  "M" : 1
+} ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileList.json b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileList.json
new file mode 100644
index 0000000..cad1981
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileList.json
@@ -0,0 +1,3 @@
+[ {
+  "M" : [ 1, 2, 3 ]
+} ]
\ No newline at end of file