You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/13 12:46:40 UTC

nifi git commit: NIFI-1152 NIFI-2117 Fixed standard session impl api adherance, mock session api adherance, corrected code and tests for script processors that had issues due to that process session bug

Repository: nifi
Updated Branches:
  refs/heads/master 3c49a9328 -> cfaacb1d5


NIFI-1152 NIFI-2117 Fixed standard session impl api adherance, mock session api adherance, corrected code and tests for script processors that had issues due to that process session bug


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cfaacb1d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cfaacb1d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cfaacb1d

Branch: refs/heads/master
Commit: cfaacb1d5c3726b6027934a30bc8108afd147773
Parents: 3c49a93
Author: joewitt <jo...@apache.org>
Authored: Tue Jul 12 23:16:19 2016 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jul 13 08:45:57 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/util/MockProcessSession.java    |  8 +++++
 .../nifi/util/TestMockProcessSession.java       | 21 ++++++++++++
 .../repository/StandardProcessSession.java      | 12 +++++--
 .../repository/TestStandardProcessSession.java  | 34 +++++++++++++++++++-
 .../script/AbstractScriptProcessor.java         | 17 ++--------
 .../script/InvokeScriptedProcessor.java         | 16 ++++-----
 .../processors/script/TestInvokeGroovy.java     |  8 ++---
 .../groovy/testScriptRoutesToFailure.groovy     |  9 ++++--
 .../test/resources/groovy/test_reader.groovy    |  2 +-
 9 files changed, 93 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 66db49a..5bc23f9 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -62,6 +62,7 @@ public class MockProcessSession implements ProcessSession {
     private final MockFlowFileQueue processorQueue;
     private final Set<Long> beingProcessed = new HashSet<>();
     private final List<MockFlowFile> penalized = new ArrayList<>();
+    private final Processor processor;
 
     private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
     private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
@@ -77,6 +78,7 @@ public class MockProcessSession implements ProcessSession {
     private int removedCount = 0;
 
     public MockProcessSession(final SharedSessionState sharedState, final Processor processor) {
+        this.processor = processor;
         this.sharedState = sharedState;
         this.processorQueue = sharedState.getFlowFileQueue();
         provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
@@ -650,6 +652,9 @@ public class MockProcessSession implements ProcessSession {
             transfer(flowFile);
             return;
         }
+        if(!processor.getRelationships().contains(relationship)){
+            throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
+        }
 
         validateState(flowFile);
         List<MockFlowFile> list = transferMap.get(relationship);
@@ -668,6 +673,9 @@ public class MockProcessSession implements ProcessSession {
             transfer(flowFiles);
             return;
         }
+        if(!processor.getRelationships().contains(relationship)){
+            throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
+        }
 
         for (final FlowFile flowFile : flowFiles) {
             validateState(flowFile);

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index 2d88351..e16afb3 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -64,6 +64,27 @@ public class TestMockProcessSession {
         }
     }
 
+    @Test
+    public void testTransferUnknownRelationship() {
+        final Processor processor = new PoorlyBehavedProcessor();
+        final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
+        FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
+        final Relationship fakeRel = new Relationship.Builder().name("FAKE").build();
+        try {
+            session.transfer(ff1, fakeRel);
+            Assert.fail("Should have thrown IllegalArgumentException");
+        } catch (final IllegalArgumentException ie) {
+
+        }
+        try {
+            session.transfer(Collections.singleton(ff1), fakeRel);
+            Assert.fail("Should have thrown IllegalArgumentException");
+        } catch (final IllegalArgumentException ie) {
+
+        }
+
+    }
+
     protected static class PoorlyBehavedProcessor extends AbstractProcessor {
 
         private static final Relationship REL_FAILURE = new Relationship.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 6ea5afe..b5da072 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1562,9 +1562,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     @Override
     public void transfer(final FlowFile flowFile, final Relationship relationship) {
         validateRecordState(flowFile);
-        final StandardRepositoryRecord record = records.get(flowFile);
-        record.setTransferRelationship(relationship);
-        updateLastQueuedDate(record);
         final int numDestinations = context.getConnections(relationship).size();
         final int multiplier = Math.max(1, numDestinations);
 
@@ -1575,7 +1572,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             autoTerminated = true;
         } else if (numDestinations == 0 && relationship == Relationship.SELF) {
             selfRelationship = true;
+        } else if (numDestinations == 0) {
+            // the relationship specified is not known in this session/context
+            throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
         }
+        final StandardRepositoryRecord record = records.get(flowFile);
+        record.setTransferRelationship(relationship);
+        updateLastQueuedDate(record);
 
         if (autoTerminated) {
             removedCount += multiplier;
@@ -1616,6 +1619,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             autoTerminated = true;
         } else if (numDestinations == 0 && relationship == Relationship.SELF) {
             selfRelationship = true;
+        } else if (numDestinations == 0) {
+            // the relationship specified is not known in this session/context
+            throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
         }
 
         final int multiplier = Math.max(1, numDestinations);

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 55c9f5a..23a170e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -97,6 +97,7 @@ public class TestStandardProcessSession {
 
     private ProvenanceEventRepository provenanceRepo;
     private MockFlowFileRepository flowFileRepo;
+    private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
 
     @After
     public void cleanup() {
@@ -187,11 +188,14 @@ public class TestStandardProcessSession {
                 final Relationship relationship = (Relationship) arguments[0];
                 if (relationship == Relationship.SELF) {
                     return Collections.emptySet();
-                } else {
+                } else if (relationship == FAKE_RELATIONSHIP || relationship.equals(FAKE_RELATIONSHIP) ){
+                    return null;
+                }else {
                     return new HashSet<>(connList);
                 }
             }
         }).when(connectable).getConnections(Mockito.any(Relationship.class));
+
         when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
 
         contentRepo = new MockContentRepository();
@@ -1250,6 +1254,34 @@ public class TestStandardProcessSession {
         session.commit();
     }
 
+    @Test
+    public void testTransferUnknownRelationship() {
+        final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
+            .id(1L)
+            .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+            .entryDate(System.currentTimeMillis())
+            .build();
+
+        flowFileQueue.put(flowFileRecord1);
+
+        FlowFile ff1 = session.get();
+        ff1 = session.putAttribute(ff1, "index", "1");
+
+        try {
+            session.transfer(ff1, FAKE_RELATIONSHIP);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (IllegalArgumentException iae) {
+        }
+
+        try {
+            final Collection<FlowFile> collection = new HashSet<>();
+            collection.add(ff1);
+            session.transfer(collection, FAKE_RELATIONSHIP);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (IllegalArgumentException iae) {
+        }
+    }
+
     private static class MockFlowFileRepository implements FlowFileRepository {
         private boolean failOnUpdate = false;
         private final AtomicLong idGenerator = new AtomicLong(0L);

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java
index ad41850..8eb8ee0 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java
@@ -16,21 +16,7 @@
  */
 package org.apache.nifi.processors.script;
 
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.StringUtils;
-
-import javax.script.ScriptEngine;
-import javax.script.ScriptEngineFactory;
-import javax.script.ScriptEngineManager;
-import javax.script.ScriptException;
 import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -213,6 +199,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
     /**
      * Performs common setup operations when the processor is scheduled to run. This method assumes the member
      * variables associated with properties have been filled.
+     * @param numberOfScriptEngines number of engines to setup
      */
     public void setup(int numberOfScriptEngines) {
 
@@ -231,6 +218,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
      * javax.script APIs. Then, if any script configurators have been defined for this engine, their init() method is
      * called, and the configurator is saved for future calls.
      *
+     * @param numberOfScriptEngines number of engines to setup
      * @see org.apache.nifi.processors.script.ScriptEngineConfigurator
      */
     protected void setupEngines(int numberOfScriptEngines) {
@@ -316,6 +304,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
      * If the parameter is null or empty, this class's classloader is returned
      *
      * @param modules An array of URLs to add to the class loader
+     * @return ClassLoader for script engine
      */
     protected ClassLoader createScriptEngineModuleClassLoader(URL[] modules) {
         ClassLoader thisClassLoader = this.getClass().getClassLoader();

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
index b4a6c0d..dd3ae58 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
@@ -63,16 +63,14 @@ import org.apache.nifi.processor.util.StandardValidators;
 public class InvokeScriptedProcessor extends AbstractScriptProcessor {
 
     private final AtomicReference<Processor> processor = new AtomicReference<>();
-    private final AtomicReference<Collection<ValidationResult>> validationResults =
-            new AtomicReference<>((Collection<ValidationResult>) new ArrayList<ValidationResult>());
+    private final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>());
 
     private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
 
     private ScriptEngine scriptEngine = null;
 
     /**
-     * Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script
-     * processor has defined additional relationships, those will be added as well.
+     * Returns the valid relationships for this processor as supplied by the script itself.
      *
      * @return a Set of Relationships supported by this processor
      */
@@ -82,7 +80,10 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
         final Processor instance = processor.get();
         if (instance != null) {
             try {
-                relationships.addAll(instance.getRelationships());
+                final Set<Relationship> rels = instance.getRelationships();
+                if(rels != null && !rels.isEmpty()){
+                    relationships.addAll(rels);
+                }
             } catch (final Throwable t) {
                 final ComponentLog logger = getLogger();
                 final String message = "Unable to get relationships from scripted Processor: " + t;
@@ -92,10 +93,6 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
                     logger.error(message, t);
                 }
             }
-        } else {
-            // Return defaults for now
-            relationships.add(REL_SUCCESS);
-            relationships.add(REL_FAILURE);
         }
         return Collections.unmodifiableSet(relationships);
     }
@@ -493,6 +490,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
     }
 
     @OnStopped
+    @Override
     public void stop() {
         super.stop();
         processor.set(null);

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
index 2dc700d..e0007fe 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
@@ -58,8 +58,8 @@ public class TestInvokeGroovy extends BaseScriptTest {
         runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
         runner.run();
 
-        runner.assertAllFlowFilesTransferred("success", 1);
-        final List<MockFlowFile> result = runner.getFlowFilesForRelationship("success");
+        runner.assertAllFlowFilesTransferred("test", 1);
+        final List<MockFlowFile> result = runner.getFlowFilesForRelationship("test");
         result.get(0).assertAttributeEquals("from-content", "test content");
     }
 
@@ -166,8 +166,8 @@ public class TestInvokeGroovy extends BaseScriptTest {
         runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(InvokeScriptedProcessor.REL_FAILURE, 1);
-        final List<MockFlowFile> result = runner.getFlowFilesForRelationship(InvokeScriptedProcessor.REL_FAILURE);
+        runner.assertAllFlowFilesTransferred("FAILURE", 1);
+        final List<MockFlowFile> result = runner.getFlowFilesForRelationship("FAILURE");
         assertFalse(result.isEmpty());
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy
index 90d7d6c..3830616 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy
@@ -18,13 +18,18 @@ class testScriptRoutesToFailure implements Processor {
 
   def ComponentLog log
 
+  def REL_FAILURE = new Relationship.Builder()
+          .name("FAILURE")
+          .description("A FAILURE relationship")
+          .build();
+
   @Override
   void initialize(ProcessorInitializationContext context) {
   }
 
   @Override
   Set<Relationship> getRelationships() {
-      return [] as Set
+        return [REL_FAILURE] as Set
   }
 
   @Override
@@ -32,7 +37,7 @@ class testScriptRoutesToFailure implements Processor {
       def session = sessionFactory.createSession()
       def flowFile = session.get()
       if(!flowFile) return
-      session.transfer(flowFile, InvokeScriptedProcessor.REL_FAILURE)
+      session.transfer(flowFile, REL_FAILURE)
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfaacb1d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
index 9778f87..414b9de 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
@@ -43,7 +43,7 @@ class GroovyProcessor implements Processor {
         }
         flowFile = session.putAttribute(flowFile, "from-content", "test content")
         // transfer
-        session.transfer(flowFile, InvokeScriptedProcessor.REL_SUCCESS)
+        session.transfer(flowFile, REL_TEST)
         session.commit()
     }