You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/04/18 13:56:34 UTC
nifi git commit: NIFI-1777 Prevent deleting a connection going to a
running processor
Repository: nifi
Updated Branches:
refs/heads/master 1a57b37dc -> f719cbf60
NIFI-1777 Prevent deleting a connection going to a running processor
NIFI-1777 Added unit tests to test processor removal
This closes #357
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f719cbf6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f719cbf6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f719cbf6
Branch: refs/heads/master
Commit: f719cbf60c58d52facaad69daf8c75f4cca5b8b6
Parents: 1a57b37
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Apr 15 16:49:47 2016 +0200
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Apr 18 07:55:59 2016 -0400
----------------------------------------------------------------------
.../nifi/connectable/StandardConnection.java | 7 ++
.../scheduling/TestProcessorLifecycle.java | 70 ++++++++++++++++++++
2 files changed, 77 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f719cbf6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index d43a3db..1176147 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -392,5 +392,12 @@ public final class StandardConnection implements Connection {
throw new IllegalStateException("Source of Connection (" + source + ") is running");
}
}
+
+ final Connectable dest = destination.get();
+ if (dest.isRunning()) {
+ if (!ConnectableType.FUNNEL.equals(dest.getConnectableType())) {
+ throw new IllegalStateException("Destination of Connection (" + dest + ") is running");
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f719cbf6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index f98ed45..2cde97b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock;
import java.io.File;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -45,6 +47,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
@@ -575,6 +578,73 @@ public class TestProcessorLifecycle {
}
/**
+ * Test deletion of processor when connected to another
+ * @throws Exception exception
+ */
+ @Test
+ public void validateProcessorDeletion() throws Exception {
+ FlowController fc = this.buildFlowControllerForTest();
+ ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+ this.setControllerRootGroup(fc, testGroup);
+
+ ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ testProcNodeA.setProperty("P", "hello");
+ testGroup.addProcessor(testProcNodeA);
+
+ ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ testProcNodeB.setProperty("P", "hello");
+ testGroup.addProcessor(testProcNodeB);
+
+ Collection<String> relationNames = new ArrayList<String>();
+ relationNames.add("relation");
+ Connection connection = fc.createConnection(UUID.randomUUID().toString(), Connection.class.getName(), testProcNodeA, testProcNodeB, relationNames);
+ testGroup.addConnection(connection);
+
+ ProcessScheduler ps = fc.getProcessScheduler();
+ ps.startProcessor(testProcNodeA);
+ ps.startProcessor(testProcNodeB);
+
+ try {
+ testGroup.removeProcessor(testProcNodeA);
+ fail();
+ } catch (Exception e) {
+ // should throw exception because processor running
+ }
+
+ try {
+ testGroup.removeProcessor(testProcNodeB);
+ fail();
+ } catch (Exception e) {
+ // should throw exception because processor running
+ }
+
+ ps.stopProcessor(testProcNodeB);
+ Thread.sleep(100);
+
+ try {
+ testGroup.removeProcessor(testProcNodeA);
+ fail();
+ } catch (Exception e) {
+ // should throw exception because destination processor running
+ }
+
+ try {
+ testGroup.removeProcessor(testProcNodeB);
+ fail();
+ } catch (Exception e) {
+ // should throw exception because source processor running
+ }
+
+ ps.stopProcessor(testProcNodeA);
+ Thread.sleep(100);
+
+ testGroup.removeProcessor(testProcNodeA);
+ testGroup.removeProcessor(testProcNodeB);
+ testGroup.shutdown();
+ fc.shutdown(true);
+ }
+
+ /**
* Scenario where onTrigger() is executed with random delay limited to
* 'delayLimit', yet with guaranteed exit from onTrigger().
*/