You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/13 12:46:40 UTC
nifi git commit: NIFI-1152 NIFI-2117 Fixed standard session impl api
adherance, mock session api adherance,
corrected code and tests for script processors that had issues due to that
process session bug
Repository: nifi
Updated Branches:
refs/heads/master 3c49a9328 -> cfaacb1d5
NIFI-1152 NIFI-2117 Fixed standard session impl api adherance, mock session api adherance, corrected code and tests for script processors that had issues due to that process session bug
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cfaacb1d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cfaacb1d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cfaacb1d
Branch: refs/heads/master
Commit: cfaacb1d5c3726b6027934a30bc8108afd147773
Parents: 3c49a93
Author: joewitt <jo...@apache.org>
Authored: Tue Jul 12 23:16:19 2016 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jul 13 08:45:57 2016 -0400
----------------------------------------------------------------------
.../apache/nifi/util/MockProcessSession.java | 8 +++++
.../nifi/util/TestMockProcessSession.java | 21 ++++++++++++
.../repository/StandardProcessSession.java | 12 +++++--
.../repository/TestStandardProcessSession.java | 34 +++++++++++++++++++-
.../script/AbstractScriptProcessor.java | 17 ++--------
.../script/InvokeScriptedProcessor.java | 16 ++++-----
.../processors/script/TestInvokeGroovy.java | 8 ++---
.../groovy/testScriptRoutesToFailure.groovy | 9 ++++--
.../test/resources/groovy/test_reader.groovy | 2 +-
9 files changed, 93 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 66db49a..5bc23f9 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -62,6 +62,7 @@ public class MockProcessSession implements ProcessSession {
private final MockFlowFileQueue processorQueue;
private final Set<Long> beingProcessed = new HashSet<>();
private final List<MockFlowFile> penalized = new ArrayList<>();
+ private final Processor processor;
private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
@@ -77,6 +78,7 @@ public class MockProcessSession implements ProcessSession {
private int removedCount = 0;
public MockProcessSession(final SharedSessionState sharedState, final Processor processor) {
+ this.processor = processor;
this.sharedState = sharedState;
this.processorQueue = sharedState.getFlowFileQueue();
provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
@@ -650,6 +652,9 @@ public class MockProcessSession implements ProcessSession {
transfer(flowFile);
return;
}
+ if(!processor.getRelationships().contains(relationship)){
+ throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
+ }
validateState(flowFile);
List<MockFlowFile> list = transferMap.get(relationship);
@@ -668,6 +673,9 @@ public class MockProcessSession implements ProcessSession {
transfer(flowFiles);
return;
}
+ if(!processor.getRelationships().contains(relationship)){
+ throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
+ }
for (final FlowFile flowFile : flowFiles) {
validateState(flowFile);
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index 2d88351..e16afb3 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -64,6 +64,27 @@ public class TestMockProcessSession {
}
}
+ @Test
+ public void testTransferUnknownRelationship() {
+ final Processor processor = new PoorlyBehavedProcessor();
+ final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
+ FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
+ final Relationship fakeRel = new Relationship.Builder().name("FAKE").build();
+ try {
+ session.transfer(ff1, fakeRel);
+ Assert.fail("Should have thrown IllegalArgumentException");
+ } catch (final IllegalArgumentException ie) {
+
+ }
+ try {
+ session.transfer(Collections.singleton(ff1), fakeRel);
+ Assert.fail("Should have thrown IllegalArgumentException");
+ } catch (final IllegalArgumentException ie) {
+
+ }
+
+ }
+
protected static class PoorlyBehavedProcessor extends AbstractProcessor {
private static final Relationship REL_FAILURE = new Relationship.Builder()
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 6ea5afe..b5da072 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1562,9 +1562,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void transfer(final FlowFile flowFile, final Relationship relationship) {
validateRecordState(flowFile);
- final StandardRepositoryRecord record = records.get(flowFile);
- record.setTransferRelationship(relationship);
- updateLastQueuedDate(record);
final int numDestinations = context.getConnections(relationship).size();
final int multiplier = Math.max(1, numDestinations);
@@ -1575,7 +1572,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
autoTerminated = true;
} else if (numDestinations == 0 && relationship == Relationship.SELF) {
selfRelationship = true;
+ } else if (numDestinations == 0) {
+ // the relationship specified is not known in this session/context
+ throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
}
+ final StandardRepositoryRecord record = records.get(flowFile);
+ record.setTransferRelationship(relationship);
+ updateLastQueuedDate(record);
if (autoTerminated) {
removedCount += multiplier;
@@ -1616,6 +1619,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
autoTerminated = true;
} else if (numDestinations == 0 && relationship == Relationship.SELF) {
selfRelationship = true;
+ } else if (numDestinations == 0) {
+ // the relationship specified is not known in this session/context
+ throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
}
final int multiplier = Math.max(1, numDestinations);
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 55c9f5a..23a170e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -97,6 +97,7 @@ public class TestStandardProcessSession {
private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
+ private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
@After
public void cleanup() {
@@ -187,11 +188,14 @@ public class TestStandardProcessSession {
final Relationship relationship = (Relationship) arguments[0];
if (relationship == Relationship.SELF) {
return Collections.emptySet();
- } else {
+ } else if (relationship == FAKE_RELATIONSHIP || relationship.equals(FAKE_RELATIONSHIP) ){
+ return null;
+ }else {
return new HashSet<>(connList);
}
}
}).when(connectable).getConnections(Mockito.any(Relationship.class));
+
when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
contentRepo = new MockContentRepository();
@@ -1250,6 +1254,34 @@ public class TestStandardProcessSession {
session.commit();
}
+ @Test
+ public void testTransferUnknownRelationship() {
+ final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+ .entryDate(System.currentTimeMillis())
+ .build();
+
+ flowFileQueue.put(flowFileRecord1);
+
+ FlowFile ff1 = session.get();
+ ff1 = session.putAttribute(ff1, "index", "1");
+
+ try {
+ session.transfer(ff1, FAKE_RELATIONSHIP);
+ Assert.fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException iae) {
+ }
+
+ try {
+ final Collection<FlowFile> collection = new HashSet<>();
+ collection.add(ff1);
+ session.transfer(collection, FAKE_RELATIONSHIP);
+ Assert.fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException iae) {
+ }
+ }
+
private static class MockFlowFileRepository implements FlowFileRepository {
private boolean failOnUpdate = false;
private final AtomicLong idGenerator = new AtomicLong(0L);
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java
index ad41850..8eb8ee0 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java
@@ -16,21 +16,7 @@
*/
package org.apache.nifi.processors.script;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.StringUtils;
-
-import javax.script.ScriptEngine;
-import javax.script.ScriptEngineFactory;
-import javax.script.ScriptEngineManager;
-import javax.script.ScriptException;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
@@ -213,6 +199,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
/**
* Performs common setup operations when the processor is scheduled to run. This method assumes the member
* variables associated with properties have been filled.
+ * @param numberOfScriptEngines number of engines to setup
*/
public void setup(int numberOfScriptEngines) {
@@ -231,6 +218,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
* javax.script APIs. Then, if any script configurators have been defined for this engine, their init() method is
* called, and the configurator is saved for future calls.
*
+ * @param numberOfScriptEngines number of engines to setup
* @see org.apache.nifi.processors.script.ScriptEngineConfigurator
*/
protected void setupEngines(int numberOfScriptEngines) {
@@ -316,6 +304,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
* If the parameter is null or empty, this class's classloader is returned
*
* @param modules An array of URLs to add to the class loader
+ * @return ClassLoader for script engine
*/
protected ClassLoader createScriptEngineModuleClassLoader(URL[] modules) {
ClassLoader thisClassLoader = this.getClass().getClassLoader();
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
index b4a6c0d..dd3ae58 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
@@ -63,16 +63,14 @@ import org.apache.nifi.processor.util.StandardValidators;
public class InvokeScriptedProcessor extends AbstractScriptProcessor {
private final AtomicReference<Processor> processor = new AtomicReference<>();
- private final AtomicReference<Collection<ValidationResult>> validationResults =
- new AtomicReference<>((Collection<ValidationResult>) new ArrayList<ValidationResult>());
+ private final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>());
private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
private ScriptEngine scriptEngine = null;
/**
- * Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script
- * processor has defined additional relationships, those will be added as well.
+ * Returns the valid relationships for this processor as supplied by the script itself.
*
* @return a Set of Relationships supported by this processor
*/
@@ -82,7 +80,10 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
final Processor instance = processor.get();
if (instance != null) {
try {
- relationships.addAll(instance.getRelationships());
+ final Set<Relationship> rels = instance.getRelationships();
+ if(rels != null && !rels.isEmpty()){
+ relationships.addAll(rels);
+ }
} catch (final Throwable t) {
final ComponentLog logger = getLogger();
final String message = "Unable to get relationships from scripted Processor: " + t;
@@ -92,10 +93,6 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
logger.error(message, t);
}
}
- } else {
- // Return defaults for now
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
}
return Collections.unmodifiableSet(relationships);
}
@@ -493,6 +490,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
}
@OnStopped
+ @Override
public void stop() {
super.stop();
processor.set(null);
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
index 2dc700d..e0007fe 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
@@ -58,8 +58,8 @@ public class TestInvokeGroovy extends BaseScriptTest {
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();
- runner.assertAllFlowFilesTransferred("success", 1);
- final List<MockFlowFile> result = runner.getFlowFilesForRelationship("success");
+ runner.assertAllFlowFilesTransferred("test", 1);
+ final List<MockFlowFile> result = runner.getFlowFilesForRelationship("test");
result.get(0).assertAttributeEquals("from-content", "test content");
}
@@ -166,8 +166,8 @@ public class TestInvokeGroovy extends BaseScriptTest {
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();
- runner.assertAllFlowFilesTransferred(InvokeScriptedProcessor.REL_FAILURE, 1);
- final List<MockFlowFile> result = runner.getFlowFilesForRelationship(InvokeScriptedProcessor.REL_FAILURE);
+ runner.assertAllFlowFilesTransferred("FAILURE", 1);
+ final List<MockFlowFile> result = runner.getFlowFilesForRelationship("FAILURE");
assertFalse(result.isEmpty());
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy
index 90d7d6c..3830616 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy
@@ -18,13 +18,18 @@ class testScriptRoutesToFailure implements Processor {
def ComponentLog log
+ def REL_FAILURE = new Relationship.Builder()
+ .name("FAILURE")
+ .description("A FAILURE relationship")
+ .build();
+
@Override
void initialize(ProcessorInitializationContext context) {
}
@Override
Set<Relationship> getRelationships() {
- return [] as Set
+ return [REL_FAILURE] as Set
}
@Override
@@ -32,7 +37,7 @@ class testScriptRoutesToFailure implements Processor {
def session = sessionFactory.createSession()
def flowFile = session.get()
if(!flowFile) return
- session.transfer(flowFile, InvokeScriptedProcessor.REL_FAILURE)
+ session.transfer(flowFile, REL_FAILURE)
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
index 9778f87..414b9de 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
@@ -43,7 +43,7 @@ class GroovyProcessor implements Processor {
}
flowFile = session.putAttribute(flowFile, "from-content", "test content")
// transfer
- session.transfer(flowFile, InvokeScriptedProcessor.REL_SUCCESS)
+ session.transfer(flowFile, REL_TEST)
session.commit()
}