You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2017/11/02 03:21:38 UTC
[27/50] [abbrv] incubator-edgent git commit: improve PressureReliever
tests
improve PressureReliever tests
- add "simple" test that verifies relief/dropping
- try to make the "continuous" test better and usable on CI
Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/39764b53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/39764b53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/39764b53
Branch: refs/heads/develop
Commit: 39764b534663d5f5ae6f15164cf7dcf16481103f
Parents: 177e912
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Oct 27 14:31:30 2017 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Oct 27 14:31:30 2017 -0400
----------------------------------------------------------------------
.../edgent/test/topology/PlumbingTest.java | 144 +++++++++++++------
1 file changed, 100 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/39764b53/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
index 6fe2e2a..644ac4f 100644
--- a/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
@@ -161,67 +161,49 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
}
@Test
- public void testPressureReliever() throws Exception {
- // Timing variances on shared machines can cause this test to fail
- assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+ public void testPressureRelieverDrop() throws Exception {
Topology topology = newTopology();
- TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);
-
+ // Verify the pressureReliever drops and retains the most recent when
+ // backpressure exists.
+ //
+ // Here, all the tuples hit the reliever at once, the downstream processing (oneShotDelay)
+ // causes a backup causing the reliever's queue to become full and drop tuples.
+ // The first tuple should be processed, then the last (most recent) N (N==queue depth).
- TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 5);
+ String[] tuples = {"A", "B", "C", "D", "E", "F", "G", "H"};
+ String[] expTuples = {"A", "F", "G", "H"}; // with queue depth of 3
+ TStream<String> raw = topology.strings(tuples);
- // insert a blocking delay acting as downstream operator that cannot keep up
- TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS);
+ TStream<String> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 3);
- // calculate the delay
- TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));
-
- // Also process raw that should be unaffected by the slow path
- TStream<String> processed = raw.asString();
+ TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 1, TimeUnit.SECONDS);
+ Condition<Long> tcCount = topology.getTester().tupleCount(pr2, expTuples.length);
+ Condition<List<String>> contents = topology.getTester().streamContents(pr2, expTuples);
+ complete(topology, tcCount);
- Condition<Long> tcSlowCount = topology.getTester().atLeastTupleCount(slow, 20);
- Condition<List<TimeAndId>> tcRaw = topology.getTester().streamContents(raw);
- Condition<List<TimeAndId>> tcSlow = topology.getTester().streamContents(slow);
- Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM);
- Condition<List<String>> tcProcessed = topology.getTester().streamContents(processed);
- complete(topology, tcSlowCount);
-
- assertTrue(tcProcessed.getResult().size() > tcSlowM.getResult().size());
- for (TimeAndId delay : tcSlowM.getResult())
- assertTrue("delay:"+delay, delay.ms < 300);
-
- // Must not lose any tuples in the non relieving path
- Set<TimeAndId> uniq = new HashSet<>(tcRaw.getResult());
- assertEquals(tcRaw.getResult().size(), uniq.size());
-
- // Must not lose any tuples in the non relieving path
- Set<String> uniqProcessed = new HashSet<>(tcProcessed.getResult());
- assertEquals(tcProcessed.getResult().size(), uniqProcessed.size());
-
- assertEquals(uniq.size(), uniqProcessed.size());
-
- // Might lose tuples, but must not have send duplicates
- uniq = new HashSet<>(tcSlow.getResult());
- assertEquals(tcSlow.getResult().size(), uniq.size());
+ assertTrue(tcCount.valid());
+ assertTrue(contents.valid());
}
@Test
- public void testPressureRelieverWithInitialDelay() throws Exception {
+ public void testPressureRelieverNoDrop() throws Exception {
Topology topology = newTopology();
+ // Same pipeline config as testPressureRelieverDrop but the reliever queue is
+ // big enough to avoid drops
+ String[] tuples = {"A", "B", "C", "D", "E", "F", "G", "H"};
+ TStream<String> raw = topology.strings(tuples);
- TStream<String> raw = topology.strings("A", "B", "C", "D", "E", "F", "G", "H");
+ TStream<String> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 100);
- TStream<String> pr = PlumbingStreams.pressureReliever(raw, v -> 0, 100);
+ TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 1, TimeUnit.SECONDS);
- TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 5, TimeUnit.SECONDS);
-
- Condition<Long> tcCount = topology.getTester().tupleCount(pr2, 8);
- Condition<List<String>> contents = topology.getTester().streamContents(pr2, "A", "B", "C", "D", "E", "F", "G", "H");
+ Condition<Long> tcCount = topology.getTester().tupleCount(pr2, tuples.length);
+ Condition<List<String>> contents = topology.getTester().streamContents(pr2, tuples);
complete(topology, tcCount);
assertTrue(tcCount.valid());
@@ -229,6 +211,80 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
}
@Test
+ public void testPressureRelieverContinuous() throws Exception {
+ // Timing variances on shared machines can cause this test to fail
+ //assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+ // Try to verify more continuous reliever behavior instead of just the
+ // the other pressure reliever tests where the backpressure only exists
+ // at the beginning.
+ //
+ // Generate @ 100tps, consume @ 5tps.
+ // With reliever depth=1, roughly should process every 20th tuple, with essentially
+ // no delay in the queue (certainly less than say 50% of the consumer delay, hence < 0.5 * 200ms)
+
+ Topology topology = newTopology();
+
+ TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);
+
+ TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 1);
+
+ TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS);
+
+ // calculate the delay (queue time + consumer processing)
+ TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));
+
+ // Also process raw that should be unaffected by the slow path
+ TStream<TimeAndId> processed = raw.filter(t -> true);
+
+
+ Condition<Long> tcSlowMCount = topology.getTester().atLeastTupleCount(slowM, 10);
+ Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM);
+ Condition<List<TimeAndId>> tcProcessed = topology.getTester().streamContents(processed);
+ complete(topology, tcSlowMCount);
+
+ System.out.println(String.format("testPressureRelieverContinuous() fastCnt:%d slowCnt:%d",
+ tcProcessed.getResult().size(), tcSlowM.getResult().size()));
+ System.out.println("slow: "+tcSlowM.getResult());
+
+ // No lost tuples in the fast path (successive Ids, starting @ 1)
+ assertEquals("fastpath tuples dropped",
+ tcProcessed.getResult().size(),
+ tcProcessed.getResult().get(tcProcessed.getResult().size()-1).id);
+
+ // No dup tuples in the fast path
+ Set<TimeAndId> uniqRaw = new HashSet<>(tcProcessed.getResult());
+ assertEquals("fastpath tuples duplicated", tcProcessed.getResult().size(), uniqRaw.size());
+
+ // fastpath count should be roughly 20x the slow delayed/relieved count
+ assertTrue("rawCnt:"+tcProcessed.getResult().size()+" slowMCnt:"+tcSlowM.getResult().size(),
+ tcProcessed.getResult().size() >= 15 * +tcSlowM.getResult().size());
+
+ // slow should process roughly every 20th tuple... not successive ones
+ TimeAndId prevId = null;
+ for (TimeAndId id : tcSlowM.getResult()) {
+ if (prevId == null) {
+ // should have processed the 1st tuple
+ assertEquals("slow firstId", 1, id.id);
+ }
+ else {
+ // seems like this could be sensitive to host load
+ assertTrue("slow ids prevId:"+prevId+" id:"+id,
+ id.id >= prevId.id + 15
+ && id.id <= prevId.id + 25);
+ }
+ prevId = id;
+ }
+
+ // every slow tuple should be processed near instantaneously - shouldn't wait
+ // long in the queue.
+ for (TimeAndId id : tcSlowM.getResult()) {
+ assertTrue("slow delays prevId:"+prevId+" id:"+id,
+ id.ms <= 300); // 200ms consumer processing + up to %50 of that waiting
+ }
+ }
+
+ @Test
public void testValveState() throws Exception {
Valve<Integer> valve = new Valve<>();
assertTrue(valve.isOpen());