You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/14 17:58:54 UTC
[nifi] 03/03: NIFI-7014: This closes #3985. Add RecordReader/Writer
access in ExecuteGroovyScript
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 824cc0ed77b93d69a33358b1e8323102f30837ff
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Tue Jan 14 09:56:43 2020 -0500
NIFI-7014: This closes #3985. Add RecordReader/Writer access in ExecuteGroovyScript
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../nifi-groovyx-processors/pom.xml | 17 ++++++
.../processors/groovyx/ExecuteGroovyScript.java | 65 ++++++++++++++++------
.../additionalDetails.html | 12 ++++
.../groovyx/ExecuteGroovyScriptTest.java | 44 +++++++++++++++
.../groovy/test_record_reader_writer.groovy | 45 +++++++++++++++
5 files changed, 166 insertions(+), 17 deletions(-)
diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml
index 94589bd..3adefbc 100644
--- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml
@@ -37,6 +37,11 @@
<version>1.11.0-SNAPSHOT</version>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-json</artifactId>
<version>${nifi.groovy.version}</version>
@@ -66,6 +71,18 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.12.1.1</version>
diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
index f67df2b..5516aeb 100644
--- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
+++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
@@ -46,6 +46,8 @@ import org.apache.nifi.processors.groovyx.flow.GroovyProcessSessionWrap;
import org.apache.nifi.processors.groovyx.sql.OSql;
import org.apache.nifi.processors.groovyx.util.Files;
import org.apache.nifi.processors.groovyx.util.Validators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.codehaus.groovy.runtime.ResourceGroovyMethods;
import org.codehaus.groovy.runtime.StackTraceUtils;
@@ -80,8 +82,9 @@ import java.util.Set;
@DynamicProperty(name = "A script engine property to update",
value = "The value to set it to",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
- description = "Updates a script engine property specified by the Dynamic Property's key with the value "
- + "specified by the Dynamic Property's value. Use `CTL.` to access any controller services.")
+ description = "Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value. "
+ + "Use `CTL.` to access any controller services, `SQL.` to access any DBCPServices, `RecordReader.` to access RecordReaderFactory instances, or "
+ + "`RecordWriter.` to access any RecordSetWriterFactory instances.")
public class ExecuteGroovyScript extends AbstractProcessor {
public static final String GROOVY_CLASSPATH = "${groovy.classes.path}";
@@ -335,9 +338,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
/**
* init SQL variables from DBCP services
*/
- @SuppressWarnings("unchecked")
- private void onInitSQL(HashMap SQL) throws SQLException {
- for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
+ private void onInitSQL(Map<String, Object> SQL) throws SQLException {
+ for (Map.Entry<String, Object> e : SQL.entrySet()) {
DBCPService s = (DBCPService) e.getValue();
OSql sql = new OSql(s.getConnection(Collections.emptyMap()));
//try to set autocommit to false
@@ -355,9 +357,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
/**
* before commit SQL services
*/
- @SuppressWarnings("unchecked")
- private void onCommitSQL(HashMap SQL) throws SQLException {
- for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
+ private void onCommitSQL(Map<String, Object> SQL) throws SQLException {
+ for (Map.Entry<String, Object> e : SQL.entrySet()) {
OSql sql = (OSql) e.getValue();
if (!sql.getConnection().getAutoCommit()) {
sql.commit();
@@ -368,9 +369,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
/**
* finalize SQL services. no exceptions should be thrown.
*/
- @SuppressWarnings("unchecked")
- private void onFinitSQL(HashMap SQL) {
- for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
+ private void onFinitSQL(Map<String, Object> SQL) {
+ for (Map.Entry<String, Object> e : SQL.entrySet()) {
OSql sql = (OSql) e.getValue();
try {
if (!sql.getConnection().getAutoCommit()) {
@@ -391,9 +391,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
/**
* exception SQL services
*/
- @SuppressWarnings("unchecked")
- private void onFailSQL(HashMap SQL) {
- for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
+ private void onFailSQL(Map<String, Object> SQL) {
+ for (Map.Entry<String, Object> e : SQL.entrySet()) {
OSql sql = (OSql) e.getValue();
try {
if (!sql.getConnection().getAutoCommit()) {
@@ -412,8 +411,10 @@ public class ExecuteGroovyScript extends AbstractProcessor {
//so transfer original input to failure will be possible
GroovyProcessSessionWrap session = new GroovyProcessSessionWrap(_session, toFailureOnError);
- HashMap CTL = new AccessMap("CTL");
- HashMap SQL = new AccessMap("SQL");
+ Map<String, Object> CTL = new AccessMap("CTL");
+ Map<String, Object> SQL = new AccessMap("SQL");
+ Map<String, Object> RECORD_READER = new AccessMap("RecordReader");
+ Map<String, Object> RECORD_SET_WRITER = new AccessMap("RecordSetWriter");
try {
Script script = getGroovyScript(); //compilation must be moved to validation
@@ -431,6 +432,14 @@ public class ExecuteGroovyScript extends AbstractProcessor {
} else if (property.getKey().getName().startsWith("SQL.")) {
DBCPService dbcp = context.getProperty(property.getKey()).asControllerService(DBCPService.class);
SQL.put(property.getKey().getName().substring(4), dbcp);
+ } else if (property.getKey().getName().startsWith("RecordReader.")) {
+ // Get RecordReaderFactory controller service
+ RecordReaderFactory recordReader = context.getProperty(property.getKey()).asControllerService(RecordReaderFactory.class);
+ RECORD_READER.put(property.getKey().getName().substring(13), recordReader);
+ } else if (property.getKey().getName().startsWith("RecordWriter.")) {
+ // Get RecordWriterFactory controller service
+ RecordSetWriterFactory recordWriter = context.getProperty(property.getKey()).asControllerService(RecordSetWriterFactory.class);
+ RECORD_SET_WRITER.put(property.getKey().getName().substring(13), recordWriter);
} else {
// Add the dynamic property bound to its full PropertyValue to the script engine
if (property.getValue() != null) {
@@ -448,6 +457,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
bindings.put("REL_FAILURE", REL_FAILURE);
bindings.put("CTL", CTL);
bindings.put("SQL", SQL);
+ bindings.put("RecordReader", RECORD_READER);
+ bindings.put("RecordWriter", RECORD_SET_WRITER);
script.run();
bindings.clear();
@@ -496,6 +507,26 @@ public class ExecuteGroovyScript extends AbstractProcessor {
.identifiesControllerService(DBCPService.class)
.build();
}
+ if (propertyDescriptorName.startsWith("RecordReader.")) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .displayName(propertyDescriptorName)
+ .required(false)
+ .description("RecordReaderFactory controller service accessible from code as `" + propertyDescriptorName + "`")
+ .dynamic(true)
+ .identifiesControllerService(RecordReaderFactory.class)
+ .build();
+ }
+ if (propertyDescriptorName.startsWith("RecordWriter.")) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .displayName(propertyDescriptorName)
+ .required(false)
+ .description("RecordSetWriterFactory controller service accessible from code as `" + propertyDescriptorName + "`")
+ .dynamic(true)
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .build();
+ }
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
@@ -506,7 +537,7 @@ public class ExecuteGroovyScript extends AbstractProcessor {
}
/** simple HashMap with exception on access of non-existent key */
- private class AccessMap extends HashMap {
+ private static class AccessMap extends HashMap<String,Object> {
private String parentKey;
AccessMap(String parentKey){
this.parentKey=parentKey;
diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html
index 3adbb36..5239aac 100644
--- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html
@@ -65,6 +65,18 @@
<br/>The `SQL.` prefixed properties could be linked only to DBCPSercice.</td>
</tr>
<tr>
+ <td>RecordReader</td>
+ <td>java.util.HashMap<String,<a href="https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java">RecordReaderFactory</a>></td>
+ <td>Map populated with controller services defined with `RecordReader.*` processor properties.
+ <br/>The `RecordReader.` prefixed properties are to be linked to RecordReaderFactory controller service instances.</td>
+</tr>
+<tr>
+ <td>RecordWriter</td>
+ <td>java.util.HashMap<String,<a href="https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java">RecordSetWriterFactory</a>></td>
+ <td>Map populated with controller services defined with `RecordWriter.*` processor properties.
+ <br/>The `RecordWriter.` prefixed properties are to be linked to RecordSetWriterFactory controller service instances.</td>
+</tr>
+<tr>
<td>Dynamic processor properties</td>
<td>org.apache.nifi.components.PropertyDescriptor</td>
<td>All processor properties not started with `CTL.` or `SQL.` are bound to script variables</td>
diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java
index b2b3e2d..ddba34a 100644
--- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java
+++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java
@@ -16,6 +16,13 @@
*/
package org.apache.nifi.processors.groovyx;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
@@ -38,6 +45,7 @@ import java.io.FileInputStream;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.List;
import java.util.HashMap;
@@ -61,6 +69,9 @@ public class ExecuteGroovyScriptTest {
protected TestRunner runner;
protected static DBCPService dbcp = null; //to make single initialization
+ protected MockRecordParser recordParser = null;
+ protected RecordSetWriterFactory recordWriter = null;
+ protected RecordSchema recordSchema = null;
protected ExecuteGroovyScript proc;
public final String TEST_RESOURCE_LOCATION = "target/test/resources/groovy/";
private final String TEST_CSV_DATA = "gender,title,first,last\n"
@@ -121,6 +132,21 @@ public class ExecuteGroovyScriptTest {
runner = TestRunners.newTestRunner(proc);
runner.addControllerService("dbcp", dbcp, new HashMap<>());
runner.enableControllerService(dbcp);
+
+ List<RecordField> recordFields = Arrays.asList(
+ new RecordField("id", RecordFieldType.INT.getDataType()),
+ new RecordField("name", RecordFieldType.STRING.getDataType()),
+ new RecordField("code", RecordFieldType.INT.getDataType()));
+ recordSchema = new SimpleRecordSchema(recordFields);
+
+ recordParser = new MockRecordParser();
+ recordFields.forEach((r) -> recordParser.addSchemaField(r));
+ runner.addControllerService("myreader", recordParser, new HashMap<>());
+ runner.enableControllerService(recordParser);
+
+ recordWriter = new MockRecordWriter();
+ runner.addControllerService("mywriter", recordWriter, new HashMap<>());
+ runner.enableControllerService(recordWriter);
}
/**
@@ -225,6 +251,7 @@ public class ExecuteGroovyScriptTest {
runner.setProperty(proc.SCRIPT_BODY, " { { ");
runner.assertNotValid();
}
+
//---------------------------------------------------------
@Test
public void test_ctl_01_access() throws Exception {
@@ -310,6 +337,23 @@ public class ExecuteGroovyScriptTest {
}
@Test
+ public void test_record_reader_writer_access() throws Exception {
+ runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_record_reader_writer.groovy");
+ runner.setProperty("RecordReader.myreader", "myreader"); //pass myreader as a service to script
+ runner.setProperty("RecordWriter.mywriter", "mywriter"); //pass mywriter as a service to script
+ runner.assertValid();
+
+ recordParser.addRecord(1, "A", "XYZ");
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
+ final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS.getName());
+ MockFlowFile resultFile = result.get(0);
+ resultFile.assertContentEquals("\"1\",\"A\",\"XYZ\"\n", "UTF-8");
+ }
+
+ @Test
public void test_filter_01() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get{it.FILTER=='3'}; if(!ff)return; REL_SUCCESS << ff;");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_record_reader_writer.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_record_reader_writer.groovy
new file mode 100644
index 0000000..aec564d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_record_reader_writer.groovy
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.nifi.logging.ComponentLog
+import org.apache.nifi.serialization.RecordReaderFactory
+import org.apache.nifi.processor.io.StreamCallback
+import org.apache.nifi.serialization.RecordSetWriterFactory
+
+
+//just check that it's possible to access controller services
+def ff = session.get()
+if (!ff) return
+def readerFactory = RecordReader.myreader
+assert readerFactory instanceof RecordReaderFactory
+def writerFactory = RecordWriter.mywriter
+assert writerFactory instanceof RecordSetWriterFactory
+
+session.write(ff, { inStream, outStream ->
+ def variables = new HashMap<String, String>(ff.attributes)
+ def recordReader = readerFactory.createRecordReader(variables, inStream, -1L, log)
+ def recordWriter = writerFactory.createWriter(log, recordReader.schema, outStream, variables)
+ def record = null
+ recordWriter.beginRecordSet()
+ while (record = recordReader.nextRecord()) {
+ recordWriter.write(record)
+ }
+ recordWriter.finishRecordSet()
+} as StreamCallback)
+
+REL_SUCCESS << ff