You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2020/12/01 16:15:14 UTC
[nifi] branch main updated: NIFI-7906: parameterized graph query
NIFI-7906: addressing PR concerns NIFI-7906: code styling fixes NIFI-7906:
adding in license information to new files + enables processor in META-INF
NIFI-7906: exclude test files from RAT NIFI-7906: PR refactor to streamline
graph response NIFI-7906: removing ERRORS output Unused after refactor Did
a few cleanups for the contributor.
This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c29cced NIFI-7906: parameterized graph query NIFI-7906: addressing PR concerns NIFI-7906: code styling fixes NIFI-7906: adding in license information to new files + enables processor in META-INF NIFI-7906: exclude test files from RAT NIFI-7906: PR refactor to streamline graph response NIFI-7906: removing ERRORS output Unused after refactor Did a few cleanups for the contributor.
c29cced is described below
commit c29cced269dcce28fb9ba034025d01e76a79b037
Author: Levi Lentz <le...@gmail.com>
AuthorDate: Thu Oct 22 19:00:34 2020 -0400
NIFI-7906: parameterized graph query
NIFI-7906: addressing PR concerns
NIFI-7906: code styling fixes
NIFI-7906: adding in license information to new files
+ enables processor in META-INF
NIFI-7906: exclude test files from RAT
NIFI-7906: PR refactor to streamline graph response
NIFI-7906: removing ERRORS output
Unused after refactor
Did a few cleanups for the contributor.
This closes #4638
Signed-off-by: Mike Thomsen <mt...@apache.org>
---
.../nifi-graph-processors/pom.xml | 55 +++++
.../processors/graph/ExecuteGraphQueryRecord.java | 270 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../graph/ExecuteGraphQueryRecordTest.java | 176 ++++++++++++++
.../processors/graph/util/InMemoryGraphClient.java | 113 +++++++++
.../src/test/resources/testAttributes.json | 3 +
.../src/test/resources/testComplexFlowFile.json | 9 +
.../src/test/resources/testFlowFileContent.json | 3 +
.../src/test/resources/testFlowFileList.json | 3 +
9 files changed, 633 insertions(+)
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
index 9b68f45..1a9b6f6 100644
--- a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
@@ -78,5 +78,60 @@
<version>1.13.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-path</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-json-utils</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/testAttributes.json</exclude>
+ <exclude>src/test/resources/testComplexFlowFile.json</exclude>
+ <exclude>src/test/resources/testFlowFileContent.json</exclude>
+ <exclude>src/test/resources/testFlowFileList.json</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
new file mode 100644
index 0000000..e750b6c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+@Tags({"graph, gremlin"})
+@CapabilityDescription("This uses a flowfile as input to perform graph mutations.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ExecuteGraphQueryRecord.GRAPH_OPERATION_TIME, description = "The amount of time it took to execute all of the graph operations."),
+ @WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT, description = "The amount of record processed")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "A dynamic property to be used as a parameter in the graph script",
+ value = "The variable name to be set", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ description = "Uses a record path to set a variable as a parameter in the graph script")
+public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
+
+ public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+ .name("client-service")
+ .displayName("Client Service")
+ .description("The graph client service for connecting to a graph database.")
+ .identifiesControllerService(GraphClientService.class)
+ .addValidator(Validator.VALID)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor READER_SERVICE = new PropertyDescriptor.Builder()
+ .name("reader-service")
+ .displayName("Record Reader")
+ .description("The record reader to use with this processor.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor WRITER_SERVICE = new PropertyDescriptor.Builder()
+ .name("writer-service")
+ .displayName("Failed Record Writer")
+ .description("The record writer to use for writing failed records.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor SUBMISSION_SCRIPT = new PropertyDescriptor.Builder()
+ .name("record-script")
+ .displayName("Graph Record Script")
+ .description("Script to perform the business logic on graph, using flow file attributes and custom properties " +
+ "as variable-value pairs in its logic.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dynamic(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ }
+
+ public static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+ CLIENT_SERVICE, READER_SERVICE, WRITER_SERVICE, SUBMISSION_SCRIPT
+ ));
+
+ public static final Relationship SUCCESS = new Relationship.Builder().name("original")
+ .description("Original flow files that successfully interacted with " +
+ "graph server.")
+ .build();
+ public static final Relationship FAILURE = new Relationship.Builder().name("failure")
+ .description("Flow files that fail to interact with graph server.")
+ .build();
+ public static final Relationship GRAPH = new Relationship.Builder().name("response")
+ .description("The response object from the graph server.")
+ .autoTerminateDefault(true)
+ .build();
+
+ public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ SUCCESS, FAILURE, GRAPH
+ )));
+
+ public static final String RECORD_COUNT = "records.count";
+ public static final String GRAPH_OPERATION_TIME = "graph.operations.took";
+ private volatile RecordPathCache recordPathCache;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ private GraphClientService clientService;
+ private RecordReaderFactory recordReaderFactory;
+ private RecordSetWriterFactory recordSetWriterFactory;
+ private ObjectMapper mapper = new ObjectMapper();
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ clientService = context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
+ recordReaderFactory = context.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class);
+ recordSetWriterFactory = context.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class);
+ recordPathCache = new RecordPathCache(100);
+ }
+
+ private List<Object> getRecordValue(Record record, RecordPath recordPath){
+ final RecordPathResult result = recordPath.evaluate(record);
+ return result.getSelectedFields()
+ .filter(fv -> fv.getValue() != null)
+ .map(FieldValue::getValue)
+ .collect( Collectors.toList());
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile input = session.get();
+ if ( input == null ) {
+ return;
+ }
+
+ List<FlowFile> graphList = new ArrayList<>();
+
+ String recordScript = context.getProperty(SUBMISSION_SCRIPT)
+ .evaluateAttributeExpressions(input)
+ .getValue();
+
+ Map<String, RecordPath> dynamic = new HashMap<>();
+
+ FlowFile finalInput = input;
+ context.getProperties()
+ .keySet().stream()
+ .filter(PropertyDescriptor::isDynamic)
+ .forEach(it ->
+ dynamic.put(it.getName(), recordPathCache.getCompiled(
+ context
+ .getProperty(it.getName())
+ .evaluateAttributeExpressions(finalInput)
+ .getValue()))
+ );
+
+
+ boolean failed = false;
+ long delta = 0;
+ try (InputStream is = session.read(input);
+ RecordReader reader = recordReaderFactory.createRecordReader(input, is, getLogger());
+ ) {
+ Record record;
+
+ long start = System.currentTimeMillis();
+ while ((record = reader.nextRecord()) != null) {
+ FlowFile graph = session.create(input);
+
+ List<Map<String,Object>> graphResponses = new ArrayList<>();
+
+ Map<String, Object> dynamicPropertyMap = new HashMap<>();
+ for (String entry : dynamic.keySet()) {
+ if(!dynamicPropertyMap.containsKey(entry)) {
+ dynamicPropertyMap.put(entry, getRecordValue(record, dynamic.get(entry)));
+ }
+ }
+
+ dynamicPropertyMap.putAll(input.getAttributes());
+ graphResponses.addAll(executeQuery(recordScript, dynamicPropertyMap));
+
+ OutputStream graphOutputStream = session.write(graph);
+ String graphOutput = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
+ graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
+ graphList.add(graph);
+ graphOutputStream.close();
+ }
+ long end = System.currentTimeMillis();
+ delta = (end - start) / 1000;
+ if (getLogger().isDebugEnabled()){
+ getLogger().debug(String.format("Took %s seconds.", delta));
+ }
+ } catch (Exception ex) {
+ getLogger().error("", ex);
+ failed = true;
+ } finally {
+ if (failed) {
+ graphList.forEach(session::remove);
+ session.transfer(input, FAILURE);
+ } else {
+ input = session.putAttribute(input, GRAPH_OPERATION_TIME, String.valueOf(delta));
+ session.getProvenanceReporter().send(input, clientService.getTransitUrl(), delta*1000);
+ session.transfer(input, SUCCESS);
+ graphList.forEach(it -> {
+ session.transfer(it, GRAPH);
+ });
+ }
+ }
+ }
+
+ private List<Map<String, Object>> executeQuery(String recordScript, Map<String, Object> parameters) {
+ ObjectMapper mapper = new ObjectMapper();
+ List<Map<String, Object>> graphResponses = new ArrayList<>();
+ clientService.executeQuery(recordScript, parameters, (map, b) -> {
+ if (getLogger().isDebugEnabled()){
+ try {
+ getLogger().debug(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map));
+ } catch (JsonProcessingException ex) {
+ getLogger().error("Error converted map to JSON ", ex);
+ }
+ }
+ graphResponses.add(map);
+ });
+ return graphResponses;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 615c785..2ab7e95 100644
--- a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.graph.ExecuteGraphQuery
+org.apache.nifi.processors.graph.ExecuteGraphQueryRecord
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
new file mode 100644
index 0000000..8c6cb96
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import groovy.json.JsonOutput;
+import org.apache.nifi.processors.graph.util.InMemoryGraphClient;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+
+public class ExecuteGraphQueryRecordTest {
+ private TestRunner runner;
+ private JsonTreeReader reader;
+ private InMemoryGraphClient graphClient;
+ Map<String, String> enqueProperties = new HashMap<>();
+
+ @Before
+ public void setup() throws InitializationException {
+ MockRecordWriter writer = new MockRecordWriter();
+ reader = new JsonTreeReader();
+ runner = TestRunners.newTestRunner(ExecuteGraphQueryRecord.class);
+ runner.addControllerService("reader", reader);
+ runner.addControllerService("writer", writer);
+ runner.setProperty(ExecuteGraphQueryRecord.READER_SERVICE, "reader");
+ runner.setProperty(ExecuteGraphQueryRecord.WRITER_SERVICE, "writer");
+
+ runner.enableControllerService(writer);
+ runner.enableControllerService(reader);
+
+ graphClient = new InMemoryGraphClient();
+
+
+ runner.addControllerService("graphClient", graphClient);
+
+ runner.setProperty(ExecuteGraphQueryRecord.CLIENT_SERVICE, "graphClient");
+ runner.enableControllerService(graphClient);
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, "[ 'testProperty': 'testResponse' ]");
+ runner.assertValid();
+ enqueProperties.put("graph.name", "graph");
+
+ }
+
+ @Test
+ public void testFlowFileContent() throws IOException {
+ List<Map> test = new ArrayList<>();
+ Map<String, Object> tempMap = new HashMap<>();
+ tempMap.put("M", 1);
+ test.add(tempMap);
+
+ byte[] json = JsonOutput.toJson(test).getBytes();
+ String submissionScript;
+ submissionScript = "[ 'M': M[0] ]";
+
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
+ runner.setProperty("M", "/M");
+ runner.enqueue(json, enqueProperties);
+
+ runner.run();
+ runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+ MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+ relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testFlowFileContent.json"));
+ }
+
+ @Test
+ public void testFlowFileList() throws IOException {
+ List<Map> test = new ArrayList<>();
+ Map<String, Object> tempMap = new HashMap<>();
+ tempMap.put("M", new ArrayList<Integer>(){
+ {
+ add(1);
+ add(2);
+ add(3);
+ }
+ });
+ test.add(tempMap);
+
+ byte[] json = JsonOutput.toJson(test).getBytes();
+ String submissionScript = "[ " +
+ "'M': M[0] " +
+ "]";
+
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
+ runner.setProperty("M", "/M");
+ runner.enqueue(json, enqueProperties);
+
+ runner.run();
+ runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+ MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+ relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testFlowFileList.json"));
+ }
+
+ @Test
+ public void testComplexFlowFile() throws IOException {
+ List<Map> test = new ArrayList<>();
+ Map<String, Object> tempMap = new HashMap<>();
+ tempMap.put("tMap", "123");
+ tempMap.put("L", new ArrayList<Integer>(){
+ {
+ add(1);
+ add(2);
+ add(3);
+ }
+ });
+ test.add(tempMap);
+
+ byte[] json = JsonOutput.toJson(test).getBytes();
+ String submissionScript = "Map<String, Object> vertexHashes = new HashMap()\n" +
+ "vertexHashes.put('1234', tMap[0])\n" +
+ "[ 'L': L[0], 'result': vertexHashes ]";
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
+ runner.setProperty("tMap", "/tMap");
+ runner.setProperty("L", "/L");
+ runner.enqueue(json, enqueProperties);
+
+ runner.run();
+ runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+ MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+ relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testComplexFlowFile.json"));
+ }
+
+ @Test
+ public void testAttributes() throws IOException {
+ List<Map<String, Object>> test = new ArrayList<>();
+ Map<String, Object> tempMap = new HashMap<>();
+ tempMap.put("tMap", "123");
+ test.add(tempMap);
+
+ byte[] json = JsonOutput.toJson(test).getBytes();
+ String submissionScript = "[ " +
+ "'testProperty': testProperty " +
+ "] ";
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
+ Map<String, String> enqueProperties = new HashMap<>();
+ enqueProperties.put("testProperty", "test");
+ runner.enqueue(json, enqueProperties);
+
+ runner.run();
+ runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+ MockFlowFile relGraph = runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+ relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testAttributes.json"));
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
new file mode 100644
index 0000000..a12d5b8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph.util;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.graph.GraphQueryResultCallback;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.janusgraph.core.JanusGraph;
+import org.janusgraph.core.JanusGraphFactory;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.AbstractMap.SimpleEntry;
+
+public class InMemoryGraphClient extends AbstractControllerService implements GraphClientService {
+ private Graph graph;
+
+ @OnEnabled
+ void onEnabled(ConfigurationContext context) {
+ graph = buildGraph();
+ }
+
+ private static JanusGraph buildGraph() {
+ return JanusGraphFactory.build().set("storage.backend", "inmemory").open();
+ }
+
+ @Override
+ public Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback graphQueryResultCallback) {
+ ScriptEngine engine = new ScriptEngineManager().getEngineByName("groovy");
+ parameters.entrySet().stream().forEach( it -> {
+ engine.put(it.getKey(), it.getValue());
+ });
+ if (graph == null) {
+ graph = buildGraph();
+ }
+ engine.put("graph", graph);
+ engine.put("g", graph.traversal());
+
+ Object response;
+ try {
+ response = engine.eval(query);
+ } catch (ScriptException ex) {
+ throw new ProcessException(ex);
+ }
+
+ if (response instanceof Map) {
+ //The below logic helps with the handling of complex Map<String, Object> relationships
+ Map resultMap = (Map) response;
+ if (!resultMap.isEmpty()) {
+ // Convertex a resultMap to an entrySet iterator
+ Iterator outerResultSet = resultMap.entrySet().iterator();
+ // this loops over the outermost map
+ while(outerResultSet.hasNext()) {
+ Map.Entry<String, Object> innerResultSet = (Map.Entry<String, Object>) outerResultSet.next();
+ // this is for edge case handling where innerResultSet is also a Map
+ if (innerResultSet.getValue() instanceof Map) {
+ Iterator resultSet = ((Map) innerResultSet.getValue()).entrySet().iterator();
+ // looping over each result in the inner map
+ while (resultSet.hasNext()) {
+ Map.Entry<String, Object> tempResult = (Map.Entry<String, Object>) resultSet.next();
+ Map<String, Object> tempRetObject = new HashMap<>();
+ tempRetObject.put(tempResult.getKey(), tempResult.getValue());
+ SimpleEntry returnObject = new SimpleEntry<String, Object>(tempResult.getKey(), tempRetObject);
+ Map<String, Object> resultReturnMap = new HashMap<>();
+ resultReturnMap.put(innerResultSet.getKey(), returnObject);
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug(resultReturnMap.toString());
+ }
+ // return the object to the graphQueryResultCallback object
+ graphQueryResultCallback.process(resultReturnMap, resultSet.hasNext());
+ }
+ } else {
+ // for non-maps, return objects need to be a map<string, object> this simply converts the object
+ // to a map to be return to the graphQueryResultCallback object
+ Map<String, Object> resultReturnMap = new HashMap<>();
+ resultReturnMap.put(innerResultSet.getKey(), innerResultSet.getValue());
+ graphQueryResultCallback.process(resultReturnMap, false);
+ }
+ }
+
+ }
+ }
+
+ return new HashMap<>();
+ }
+
+ @Override
+ public String getTransitUrl() {
+ return "memory://localhost/graph";
+ }
+}
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testAttributes.json b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testAttributes.json
new file mode 100644
index 0000000..b8084ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testAttributes.json
@@ -0,0 +1,3 @@
+[ {
+ "testProperty" : "test"
+} ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testComplexFlowFile.json b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testComplexFlowFile.json
new file mode 100644
index 0000000..19852fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testComplexFlowFile.json
@@ -0,0 +1,9 @@
+[ {
+ "L" : [ 1, 2, 3 ]
+}, {
+ "result" : {
+ "1234" : {
+ "1234" : "123"
+ }
+ }
+} ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileContent.json b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileContent.json
new file mode 100644
index 0000000..fefbaf3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileContent.json
@@ -0,0 +1,3 @@
+[ {
+ "M" : 1
+} ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileList.json b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileList.json
new file mode 100644
index 0000000..cad1981
--- /dev/null
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileList.json
@@ -0,0 +1,3 @@
+[ {
+ "M" : [ 1, 2, 3 ]
+} ]
\ No newline at end of file