You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2017/11/02 03:21:38 UTC

[27/50] [abbrv] incubator-edgent git commit: improve PressureReliever tests

improve PressureReliever tests

- add "simple" test that verifies relief/dropping
- try to make the "continuous" test better and usable on CI

Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/39764b53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/39764b53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/39764b53

Branch: refs/heads/develop
Commit: 39764b534663d5f5ae6f15164cf7dcf16481103f
Parents: 177e912
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Oct 27 14:31:30 2017 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Oct 27 14:31:30 2017 -0400

----------------------------------------------------------------------
 .../edgent/test/topology/PlumbingTest.java      | 144 +++++++++++++------
 1 file changed, 100 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/39764b53/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
index 6fe2e2a..644ac4f 100644
--- a/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
@@ -161,67 +161,49 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
     }
     
     @Test
-    public void testPressureReliever() throws Exception {
-		// Timing variances on shared machines can cause this test to fail
-		assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+    public void testPressureRelieverDrop() throws Exception {
 
         Topology topology = newTopology();
         
-        TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);
-           
+        // Verify the pressureReliever drops and retains the most recent when
+        // backpressure exists.
+        //
+        // Here, all the tuples hit the reliever at once, the downstream processing (oneShotDelay)
+        // causes a backup causing the reliever's queue to become full and drop tuples.
+        // The first tuple should be processed, then the last (most recent) N (N==queue depth).
         
-        TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 5);
+        String[] tuples = {"A", "B", "C", "D", "E", "F", "G", "H"};
+        String[] expTuples = {"A", "F", "G", "H"};  // with queue depth of 3
+        TStream<String> raw = topology.strings(tuples);
         
-        // insert a blocking delay acting as downstream operator that cannot keep up
-        TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS);
+        TStream<String> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 3);
         
-        // calculate the delay
-        TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));
-        
-        // Also process raw that should be unaffected by the slow path
-        TStream<String> processed = raw.asString();
+        TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 1, TimeUnit.SECONDS);
         
+        Condition<Long> tcCount = topology.getTester().tupleCount(pr2, expTuples.length);
+        Condition<List<String>> contents = topology.getTester().streamContents(pr2, expTuples);
+        complete(topology, tcCount);
         
-        Condition<Long> tcSlowCount = topology.getTester().atLeastTupleCount(slow, 20);
-        Condition<List<TimeAndId>> tcRaw = topology.getTester().streamContents(raw);
-        Condition<List<TimeAndId>> tcSlow = topology.getTester().streamContents(slow);
-        Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM);
-        Condition<List<String>> tcProcessed = topology.getTester().streamContents(processed);
-        complete(topology, tcSlowCount);
-        
-        assertTrue(tcProcessed.getResult().size() > tcSlowM.getResult().size());
-        for (TimeAndId delay : tcSlowM.getResult())
-            assertTrue("delay:"+delay, delay.ms < 300);
-
-        // Must not lose any tuples in the non relieving path
-        Set<TimeAndId> uniq = new HashSet<>(tcRaw.getResult());
-        assertEquals(tcRaw.getResult().size(), uniq.size());
-
-        // Must not lose any tuples in the non relieving path
-        Set<String> uniqProcessed = new HashSet<>(tcProcessed.getResult());
-        assertEquals(tcProcessed.getResult().size(), uniqProcessed.size());
-        
-        assertEquals(uniq.size(), uniqProcessed.size());
-           
-        // Might lose tuples, but must not have send duplicates
-        uniq = new HashSet<>(tcSlow.getResult());
-        assertEquals(tcSlow.getResult().size(), uniq.size());
+        assertTrue(tcCount.valid());
+        assertTrue(contents.valid());
     }
     
     @Test
-    public void testPressureRelieverWithInitialDelay() throws Exception {
+    public void testPressureRelieverNoDrop() throws Exception {
 
         Topology topology = newTopology();
         
+        // Same pipeline config as testPressureRelieverDrop but the reliever queue is
+        // big enough to avoid drops
+        String[] tuples = {"A", "B", "C", "D", "E", "F", "G", "H"};
+        TStream<String> raw = topology.strings(tuples);
         
-        TStream<String> raw = topology.strings("A", "B", "C", "D", "E", "F", "G", "H");
+        TStream<String> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 100);
         
-        TStream<String> pr = PlumbingStreams.pressureReliever(raw, v -> 0, 100);
+        TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 1, TimeUnit.SECONDS);
         
-        TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 5, TimeUnit.SECONDS);
-        
-        Condition<Long> tcCount = topology.getTester().tupleCount(pr2, 8);
-        Condition<List<String>> contents = topology.getTester().streamContents(pr2, "A", "B", "C", "D", "E", "F", "G", "H");
+        Condition<Long> tcCount = topology.getTester().tupleCount(pr2, tuples.length);
+        Condition<List<String>> contents = topology.getTester().streamContents(pr2, tuples);
         complete(topology, tcCount);
         
         assertTrue(tcCount.valid());
@@ -229,6 +211,80 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
     }
     
     @Test
+    public void testPressureRelieverContinuous() throws Exception {
+		// Timing variances on shared machines can cause this test to fail
+		//assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+		// Try to verify more continuous reliever behavior instead of just the
+		// the other pressure reliever tests where the backpressure only exists
+		// at the beginning.
+		//
+		// Generate @ 100tps, consume @ 5tps.  
+		// With reliever depth=1, roughly should process every 20th tuple, with essentially
+		// no delay in the queue (certainly less than say 50% of the consumer delay, hence < 0.5 * 200ms)
+		
+        Topology topology = newTopology();
+
+        TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);
+        
+        TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 1);
+        
+        TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS);
+        
+        // calculate the delay (queue time + consumer processing)
+        TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));
+        
+        // Also process raw that should be unaffected by the slow path
+        TStream<TimeAndId> processed = raw.filter(t -> true);
+        
+        
+        Condition<Long> tcSlowMCount = topology.getTester().atLeastTupleCount(slowM, 10);
+        Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM);
+        Condition<List<TimeAndId>> tcProcessed = topology.getTester().streamContents(processed);
+        complete(topology, tcSlowMCount);
+        
+        System.out.println(String.format("testPressureRelieverContinuous() fastCnt:%d slowCnt:%d",
+                tcProcessed.getResult().size(), tcSlowM.getResult().size()));
+        System.out.println("slow: "+tcSlowM.getResult());
+
+        // No lost tuples in the fast path (successive Ids, starting @ 1)
+        assertEquals("fastpath tuples dropped",
+                tcProcessed.getResult().size(),
+                tcProcessed.getResult().get(tcProcessed.getResult().size()-1).id);
+
+        // No dup tuples in the fast path
+        Set<TimeAndId> uniqRaw = new HashSet<>(tcProcessed.getResult());
+        assertEquals("fastpath tuples duplicated", tcProcessed.getResult().size(), uniqRaw.size());
+
+        // fastpath count should be roughly 20x the slow delayed/relieved count
+        assertTrue("rawCnt:"+tcProcessed.getResult().size()+" slowMCnt:"+tcSlowM.getResult().size(),
+                tcProcessed.getResult().size() >= 15 * +tcSlowM.getResult().size());
+        
+        // slow should process roughly every 20th tuple... not successive ones
+        TimeAndId prevId = null;
+        for (TimeAndId id : tcSlowM.getResult()) {
+            if (prevId == null) {
+                // should have processed the 1st tuple
+                assertEquals("slow firstId", 1, id.id);
+            }
+            else {
+                // seems like this could be sensitive to host load
+                assertTrue("slow ids prevId:"+prevId+" id:"+id,
+                        id.id >= prevId.id + 15
+                        && id.id <= prevId.id + 25);
+            }
+            prevId = id;
+        }
+        
+        // every slow tuple should be processed near instantaneously - shouldn't wait
+        // long in the queue.
+        for (TimeAndId id : tcSlowM.getResult()) {
+            assertTrue("slow delays prevId:"+prevId+" id:"+id,
+                    id.ms <= 300);  // 200ms consumer processing + up to %50 of that waiting  
+        }
+    }
+    
+    @Test
     public void testValveState() throws Exception {
         Valve<Integer> valve = new Valve<>();
         assertTrue(valve.isOpen());