You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/11/15 20:02:19 UTC
nifi git commit: NIFI-5824: Added unit test to FlowController to
ensure that the ProcessScheduler that it creates is properly initialized.
Also updated the properties file used by TestFlowController to use a
VolatileContentRepository instead of FileSyste
Repository: nifi
Updated Branches:
refs/heads/master 76b0065a6 -> be0949570
NIFI-5824: Added unit test to FlowController to ensure that the ProcessScheduler that it creates is properly initialized. Also updated the properties file used by TestFlowController to use a VolatileContentRepository instead of FileSystemRepository, and fixed EventDrivenWorkerQueue to return if calls to poll() are interrupted (via Thread.interrupt) - making these minor fixes resulted in the unit test TestFlowController running in 2 seconds instead of 30+ seconds on my machine
This closes #3173.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/be094957
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/be094957
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/be094957
Branch: refs/heads/master
Commit: be0949570a66f672e128ac97c936df546c7d2521
Parents: 76b0065
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Nov 15 14:26:36 2018 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Nov 15 15:01:56 2018 -0500
----------------------------------------------------------------------
.../nifi/controller/EventDrivenWorkerQueue.java | 12 ++++++-----
.../scheduling/StandardProcessScheduler.java | 9 +++++++--
.../nifi/controller/TestFlowController.java | 21 ++++++++++++++++++++
.../flowcontrollertest.nifi.properties | 3 +++
4 files changed, 38 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
index f36a459..25e8a86 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
@@ -16,6 +16,11 @@
*/
package org.apache.nifi.controller;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.Connectables;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -25,11 +30,6 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.util.Connectables;
-
public class EventDrivenWorkerQueue implements WorkerQueue {
private final Object workMonitor = new Object();
@@ -69,6 +69,8 @@ public class EventDrivenWorkerQueue implements WorkerQueue {
try {
workMonitor.wait(timeLeft);
} catch (final InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ return null;
}
} else {
// Decrement the amount of work there is to do for this worker.
http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 2902f6a..2ff3307 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -35,6 +35,7 @@ import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ComponentLog;
@@ -103,6 +104,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
}
+ public ControllerServiceProvider getControllerServiceProvider() {
+ return flowController.getControllerServiceProvider();
+ }
+
private StateManager getStateManager(final String componentId) {
return stateManagerProvider.getStateManager(componentId);
}
@@ -293,7 +298,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode, final boolean failIfStopping) {
final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true);
- final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
+ final StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(),
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated);
final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -333,7 +338,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
public synchronized CompletableFuture<Void> stopProcessor(final ProcessorNode procNode) {
final LifecycleState lifecycleState = getLifecycleState(procNode, false);
- StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
+ StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(),
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated);
LOG.info("Stopping {}", procNode);
http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index e3c91b2..651ce9c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -34,8 +34,10 @@ import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.mock.DummyProcessor;
import org.apache.nifi.controller.service.mock.DummyReportingTask;
import org.apache.nifi.controller.service.mock.ServiceA;
@@ -95,6 +97,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -126,6 +129,7 @@ public class TestFlowController {
return StringEncryptor.createEncryptor(algorithm, provider, password);
}
+
@Before
public void setup() {
@@ -337,6 +341,23 @@ public class TestFlowController {
controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
}
+ /**
+ * StandardProcessScheduler is created by FlowController. The StandardProcessScheduler needs access to the Controller Service Provider,
+ * but the Controller Service Provider needs the ProcessScheduler in its constructor. So the StandardProcessScheduler obtains the Controller Service
+ * Provider by making a call back to FlowController.getControllerServiceProvider. This test exists to ensure that we always have access to the
+ * Controller Service Provider in the Process Scheduler, and that we don't inadvertently start storing away the result of calling
+ * FlowController.getControllerServiceProvider() before the service provider has been fully initialized.
+ */
+ @Test
+ public void testProcessSchedulerHasAccessToControllerServiceProvider() {
+ final StandardProcessScheduler scheduler = controller.getProcessScheduler();
+ assertNotNull(scheduler);
+
+ final ControllerServiceProvider serviceProvider = scheduler.getControllerServiceProvider();
+ assertNotNull(serviceProvider);
+ assertSame(serviceProvider, controller.getControllerServiceProvider());
+ }
+
@Test
public void testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties
index d9aa4d2..a4c1a4a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties
@@ -48,6 +48,9 @@ nifi.swap.out.period=5 sec
nifi.swap.out.threads=4
# Content Repository
+nifi.content.repository.implementation=org.apache.nifi.controller.repository.VolatileContentRepository
+nifi.volatile.content.repository.max.size=1 KB
+nifi.volatile.content.repository.block.size=1 KB
nifi.content.claim.max.appendable.size=10 MB
nifi.content.claim.max.flow.files=100
nifi.content.repository.directory.default=./target/flowcontrollertest/content_repository