You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/11/30 20:00:09 UTC

nifi git commit: NIFI-4642 updated tests to be more tolerant/variable to different system speeds. Many of these should be integration tests and not unit tests. This closes #2303.

Repository: nifi
Updated Branches:
  refs/heads/master 45df23b1e -> dd981e87d


NIFI-4642 updated tests to be more tolerant/variable to different system speeds.  Many of these should be integration tests and not unit tests. This closes #2303.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dd981e87
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dd981e87
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dd981e87

Branch: refs/heads/master
Commit: dd981e87ddadc907590a1d89ef3925f668cb00c1
Parents: 45df23b
Author: joewitt <jo...@apache.org>
Authored: Tue Nov 28 12:51:47 2017 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Nov 30 14:59:58 2017 -0500

----------------------------------------------------------------------
 .../scheduling/TestProcessorLifecycle.java      | 128 ++++++++-----------
 1 file changed, 53 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/dd981e87/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index c544ef4..f8f0426 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -72,6 +72,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -86,7 +87,7 @@ public class TestProcessorLifecycle {
 
     private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class);
     private FlowController fc;
-    private Map<String,String> properties = new HashMap<>();
+    private Map<String, String> properties = new HashMap<>();
     private volatile String propsFile = TestProcessorLifecycle.class.getResource("/lifecycletest.nifi.properties").getFile();
 
     @Before
@@ -100,6 +101,23 @@ public class TestProcessorLifecycle {
         FileUtils.deleteDirectory(new File("./target/lifecycletest"));
     }
 
+    private void assertCondition(final Supplier<Boolean> supplier) {
+        assertCondition(supplier, 1000L);
+    }
+
+    private void assertCondition(final Supplier<Boolean> supplier, final long delayToleranceMillis) {
+        final long startTime = System.currentTimeMillis();
+        while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && !supplier.get()) {
+            try {
+                Thread.sleep(50);
+            } catch (InterruptedException ex) {
+                Thread.interrupted();
+                break;
+            }
+        }
+        assertTrue(supplier.get());
+    }
+
     @Test
     public void validateEnableOperation() throws Exception {
         final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
@@ -109,17 +127,17 @@ public class TestProcessorLifecycle {
         final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
                 UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
 
-        assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
-        assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
         // validates idempotency
         for (int i = 0; i < 2; i++) {
             testProcNode.enable();
         }
-        assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
-        assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
         testProcNode.disable();
-        assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState());
-        assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState());
+        assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
     }
 
     @Test
@@ -132,18 +150,18 @@ public class TestProcessorLifecycle {
         final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
                 UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
-        assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
-        assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
         // validates idempotency
         for (int i = 0; i < 2; i++) {
             testProcNode.disable();
         }
-        assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState());
-        assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState());
+        assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
 
         ProcessScheduler ps = fc.getProcessScheduler();
         ps.startProcessor(testProcNode);
-        assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState());
+        assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
     }
 
     /**
@@ -171,7 +189,7 @@ public class TestProcessorLifecycle {
         ps.startProcessor(testProcNode);
 
         Thread.sleep(500);
-        assertEquals(1, testProcessor.operationNames.size());
+        assertCondition(() -> testProcessor.operationNames.size() == 1);
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
     }
 
@@ -189,14 +207,14 @@ public class TestProcessorLifecycle {
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
         // sets the scenario for the processor to run
         int randomDelayLimit = 3000;
         this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
         final ProcessScheduler ps = fc.getProcessScheduler();
         ps.stopProcessor(testProcNode);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        assertTrue(testProcessor.operationNames.size() == 0);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
+        assertTrue(testProcessor.operationNames.isEmpty());
     }
 
     /**
@@ -205,7 +223,7 @@ public class TestProcessorLifecycle {
      */
     @Test
     @Ignore
-    public void validateSuccessfullAndOrderlyShutdown() throws Exception {
+    public void validateSuccessfulAndOrderlyShutdown() throws Exception {
         final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
         fc = fcsb.getFlowController();
         ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
@@ -226,8 +244,7 @@ public class TestProcessorLifecycle {
         testGroup.addProcessor(testProcNode);
 
         fc.startProcessGroup(testGroup.getIdentifier());
-        Thread.sleep(2000); // let it run for a while
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
 
         fc.stopAllProcessors();
 
@@ -235,9 +252,9 @@ public class TestProcessorLifecycle {
 
         // validates that regardless of how many running tasks, lifecycle
         // operation are invoked atomically (once each).
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 1000L);
         // . . . hence only 3 operations must be in the list
-        assertEquals(3, testProcessor.operationNames.size());
+        assertCondition(() -> testProcessor.operationNames.size() == 3, 2000L);
         // . . . and ordered as @OnScheduled, @OnUnscheduled, @OnStopped
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
         assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
@@ -268,7 +285,7 @@ public class TestProcessorLifecycle {
         ExecutorService executor = Executors.newFixedThreadPool(100);
         int startCallsCount = 10000;
         final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
         final Random random = new Random();
         for (int i = 0; i < startCallsCount / 2; i++) {
             executor.execute(new Runnable() {
@@ -326,18 +343,11 @@ public class TestProcessorLifecycle {
         ProcessScheduler ps = fc.getProcessScheduler();
 
         ps.startProcessor(testProcNode);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 5000L);
 
         ps.stopProcessor(testProcNode);
-        Thread.sleep(100);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-
-        assertEquals(2, testProcessor.operationNames.size());
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L);
+        assertCondition(() -> testProcessor.operationNames.size() == 2, 8000L);
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
         assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
     }
@@ -366,15 +376,9 @@ public class TestProcessorLifecycle {
         ProcessScheduler ps = fc.getProcessScheduler();
 
         ps.startProcessor(testProcNode);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
-        Thread.sleep(100);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 10000L);
         ps.stopProcessor(testProcNode);
-        Thread.sleep(500);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L);
     }
 
     /**
@@ -401,15 +405,9 @@ public class TestProcessorLifecycle {
         ProcessScheduler ps = fc.getProcessScheduler();
 
         ps.startProcessor(testProcNode);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
         ps.stopProcessor(testProcNode);
-        Thread.sleep(100);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
-        Thread.sleep(500);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L);
     }
 
     /**
@@ -432,15 +430,9 @@ public class TestProcessorLifecycle {
         ProcessScheduler ps = fc.getProcessScheduler();
 
         ps.startProcessor(testProcNode);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
         ps.stopProcessor(testProcNode);
-        Thread.sleep(100);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
-        Thread.sleep(4000);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L);
     }
 
     /**
@@ -463,20 +455,9 @@ public class TestProcessorLifecycle {
         ProcessScheduler ps = fc.getProcessScheduler();
 
         ps.startProcessor(testProcNode);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
-        ps.disableProcessor(testProcNode); // no effect
-        Thread.sleep(100);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 3000L);
         ps.stopProcessor(testProcNode);
-        Thread.sleep(100);
-        assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        Thread.sleep(4000);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 4000L);
     }
 
     /**
@@ -501,14 +482,11 @@ public class TestProcessorLifecycle {
         ProcessScheduler ps = fc.getProcessScheduler();
 
         ps.startProcessor(testProcNode);
-        Thread.sleep(1000);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
         ps.disableProcessor(testProcNode);
-        Thread.sleep(100);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
         ps.stopProcessor(testProcNode);
-        Thread.sleep(500);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L);
     }
 
     /**