You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/04/18 13:56:34 UTC

nifi git commit: NIFI-1777 Prevent deleting a connection going to a running processor

Repository: nifi
Updated Branches:
  refs/heads/master 1a57b37dc -> f719cbf60


NIFI-1777 Prevent deleting a connection going to a running processor

NIFI-1777 Added unit tests to test processor removal
This closes #357


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f719cbf6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f719cbf6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f719cbf6

Branch: refs/heads/master
Commit: f719cbf60c58d52facaad69daf8c75f4cca5b8b6
Parents: 1a57b37
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Apr 15 16:49:47 2016 +0200
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Apr 18 07:55:59 2016 -0400

----------------------------------------------------------------------
 .../nifi/connectable/StandardConnection.java    |  7 ++
 .../scheduling/TestProcessorLifecycle.java      | 70 ++++++++++++++++++++
 2 files changed, 77 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f719cbf6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index d43a3db..1176147 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -392,5 +392,12 @@ public final class StandardConnection implements Connection {
                 throw new IllegalStateException("Source of Connection (" + source + ") is running");
             }
         }
+
+        final Connectable dest = destination.get();
+        if (dest.isRunning()) {
+            if (!ConnectableType.FUNNEL.equals(dest.getConnectableType())) {
+                throw new IllegalStateException("Destination of Connection (" + dest + ") is running");
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f719cbf6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index f98ed45..2cde97b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -45,6 +47,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.FlowController;
@@ -575,6 +578,73 @@ public class TestProcessorLifecycle {
     }
 
     /**
+     * Test deletion of processor when connected to another
+     * @throws Exception exception
+     */
+    @Test
+    public void validateProcessorDeletion() throws Exception {
+        FlowController fc = this.buildFlowControllerForTest();
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+
+        ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+        testProcNodeA.setProperty("P", "hello");
+        testGroup.addProcessor(testProcNodeA);
+
+        ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+        testProcNodeB.setProperty("P", "hello");
+        testGroup.addProcessor(testProcNodeB);
+
+        Collection<String> relationNames = new ArrayList<String>();
+        relationNames.add("relation");
+        Connection connection = fc.createConnection(UUID.randomUUID().toString(), Connection.class.getName(), testProcNodeA, testProcNodeB, relationNames);
+        testGroup.addConnection(connection);
+
+        ProcessScheduler ps = fc.getProcessScheduler();
+        ps.startProcessor(testProcNodeA);
+        ps.startProcessor(testProcNodeB);
+
+        try {
+            testGroup.removeProcessor(testProcNodeA);
+            fail();
+        } catch (Exception e) {
+            // should throw exception because processor running
+        }
+
+        try {
+            testGroup.removeProcessor(testProcNodeB);
+            fail();
+        } catch (Exception e) {
+            // should throw exception because processor running
+        }
+
+        ps.stopProcessor(testProcNodeB);
+        Thread.sleep(100);
+
+        try {
+            testGroup.removeProcessor(testProcNodeA);
+            fail();
+        } catch (Exception e) {
+            // should throw exception because destination processor running
+        }
+
+        try {
+            testGroup.removeProcessor(testProcNodeB);
+            fail();
+        } catch (Exception e) {
+            // should throw exception because source processor running
+        }
+
+        ps.stopProcessor(testProcNodeA);
+        Thread.sleep(100);
+
+        testGroup.removeProcessor(testProcNodeA);
+        testGroup.removeProcessor(testProcNodeB);
+        testGroup.shutdown();
+        fc.shutdown(true);
+    }
+
+    /**
      * Scenario where onTrigger() is executed with random delay limited to
      * 'delayLimit', yet with guaranteed exit from onTrigger().
      */