You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:38 UTC
[37/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/JsonFunctionsTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/JsonFunctionsTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/JsonFunctionsTest.java
new file mode 100644
index 0000000..a9ff459
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/JsonFunctionsTest.java
@@ -0,0 +1,78 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.edgent.function.Function;
+import org.apache.edgent.topology.json.JsonFunctions;
+import org.junit.Test;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+
+public class JsonFunctionsTest {
+
+ private JsonObject newTestObject() {
+ // Just a mix of things so we have reasonable confidence
+ // the JsonFunctions are working.
+ JsonObject jo = new JsonObject();
+ jo.addProperty("boolean", true);
+ jo.addProperty("character", 'c');
+ jo.addProperty("short", (short)7);
+ jo.addProperty("int", 23);
+ jo.addProperty("long", 99L);
+ jo.addProperty("float", 3.0f);
+ jo.addProperty("double", 7.128d);
+ jo.addProperty("string", "a string value");
+ JsonArray ja = new JsonArray();
+ ja.add(new JsonPrimitive(123));
+ ja.add(new JsonPrimitive(456));
+ jo.add("array", ja);
+ JsonObject jo2 = new JsonObject();
+ jo2.addProperty("int", 789);
+ jo.add("object", jo2);
+ return jo;
+ }
+
+ @Test
+ public void testStrings() {
+ JsonObject jo1 = newTestObject();
+ Function<JsonObject,String> asString = JsonFunctions.asString();
+ Function<String,JsonObject> fromString = JsonFunctions.fromString();
+
+ String s1 = asString.apply(jo1);
+ JsonObject jo2 = fromString.apply(s1);
+
+ assertEquals(jo2, jo1);
+ }
+
+ @Test
+ public void testBytes() {
+ JsonObject jo1 = newTestObject();
+ Function<JsonObject,byte[]> asBytes = JsonFunctions.asBytes();
+ Function<byte[],JsonObject> fromBytes = JsonFunctions.fromBytes();
+
+ byte[] b1 = asBytes.apply(jo1);
+ JsonObject jo2 = fromBytes.apply(b1);
+
+ assertEquals(jo2, jo1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
new file mode 100644
index 0000000..e77330b
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
@@ -0,0 +1,746 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Functions;
+import org.apache.edgent.function.ToIntFunction;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.apache.edgent.topology.plumbing.Valve;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+@Ignore
+public abstract class PlumbingTest extends TopologyAbstractTest {
+
+
+ @Test
+ public void testBlockingDelay() throws Exception {
+ // Timing variances on shared machines can cause this test to fail
+ assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+ Topology topology = newTopology();
+
+ TStream<String> strings = topology.strings("a", "b", "c", "d");
+
+ TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
+
+ // delay stream
+ starts = PlumbingStreams.blockingDelay(starts, 300, TimeUnit.MILLISECONDS);
+
+ // calculate display
+ starts = starts.modify(v -> System.currentTimeMillis() - v);
+
+ starts = starts.filter(v -> v >= 300);
+
+ Condition<Long> tc = topology.getTester().tupleCount(starts, 4);
+ complete(topology, tc);
+ assertTrue("valid:" + tc.getResult(), tc.valid());
+ }
+
+ @Test
+ public void testBlockingThrottle() throws Exception {
+ // Timing variances on shared machines can cause this test to fail
+ assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+ Topology topology = newTopology();
+
+ TStream<String> strings = topology.strings("a", "b", "c", "d");
+
+ TStream<Long> emittedDelays = strings.map(v -> 0L);
+
+ // throttle stream
+ long[] lastEmittedTimestamp = { 0 };
+ emittedDelays = PlumbingStreams.blockingThrottle(emittedDelays, 300, TimeUnit.MILLISECONDS)
+ .map(t -> {
+ // compute the delay since the last emitted tuple
+ long now = System.currentTimeMillis();
+ if (lastEmittedTimestamp[0] == 0)
+ lastEmittedTimestamp[0] = now;
+ t = now - lastEmittedTimestamp[0];
+ lastEmittedTimestamp[0] = now;
+ // System.out.println("### "+t);
+ return t;
+ })
+ .map(t -> {
+ // simulate 200ms downstream processing delay
+ try {
+ Thread.sleep(TimeUnit.MILLISECONDS.toMillis(200));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } return t;
+ }) ;
+
+ // should end up with throttled delays close to 300 (not 500 like
+ // a blockingDelay() under these same conditions would yield)
+ emittedDelays = emittedDelays.filter(v -> v <= 320);
+
+ Condition<Long> tc = topology.getTester().tupleCount(emittedDelays, 4);
+ complete(topology, tc);
+ assertTrue("valid:" + tc.getResult(), tc.valid());
+ }
+
+ @Test
+ public void testOneShotDelay() throws Exception {
+
+ Topology topology = newTopology();
+
+ TStream<String> strings = topology.strings("a", "b", "c", "d");
+
+ TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
+
+ // delay stream
+ starts = PlumbingStreams.blockingOneShotDelay(starts, 300, TimeUnit.MILLISECONDS);
+
+ // calculate display
+ starts = starts.modify(v -> System.currentTimeMillis() - v);
+
+ // the first tuple shouldn't satisfy the predicate
+ starts = starts.filter(v -> v < 300);
+
+ Condition<Long> tc = topology.getTester().tupleCount(starts, 3);
+ complete(topology, tc);
+ assertTrue("valid:" + tc.getResult(), tc.valid());
+ }
+
+ public static class TimeAndId {
+ private static AtomicInteger ids = new AtomicInteger();
+ long ms;
+ final int id;
+
+ public TimeAndId() {
+ this.ms = System.currentTimeMillis();
+ this.id = ids.incrementAndGet();
+ }
+ public TimeAndId(TimeAndId tai) {
+ this.ms = System.currentTimeMillis() - tai.ms;
+ this.id = tai.id;
+ }
+ @Override
+ public String toString() {
+ return "TAI:" + id + "@" + ms;
+ }
+
+ }
+
+ @Test
+ public void testPressureReliever() throws Exception {
+ // Timing variances on shared machines can cause this test to fail
+ assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+ Topology topology = newTopology();
+
+ TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);
+
+
+ TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 5);
+
+ // insert a blocking delay acting as downstream operator that cannot keep up
+ TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS);
+
+ // calculate the delay
+ TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));
+
+ // Also process raw that should be unaffected by the slow path
+ TStream<String> processed = raw.asString();
+
+
+ Condition<Long> tcSlowCount = topology.getTester().atLeastTupleCount(slow, 20);
+ Condition<List<TimeAndId>> tcRaw = topology.getTester().streamContents(raw);
+ Condition<List<TimeAndId>> tcSlow = topology.getTester().streamContents(slow);
+ Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM);
+ Condition<List<String>> tcProcessed = topology.getTester().streamContents(processed);
+ complete(topology, tcSlowCount);
+
+ assertTrue(tcProcessed.getResult().size() > tcSlowM.getResult().size());
+ for (TimeAndId delay : tcSlowM.getResult())
+ assertTrue(delay.ms < 300);
+
+ // Must not lose any tuples in the non relieving path
+ Set<TimeAndId> uniq = new HashSet<>(tcRaw.getResult());
+ assertEquals(tcRaw.getResult().size(), uniq.size());
+
+ // Must not lose any tuples in the non relieving path
+ Set<String> uniqProcessed = new HashSet<>(tcProcessed.getResult());
+ assertEquals(tcProcessed.getResult().size(), uniqProcessed.size());
+
+ assertEquals(uniq.size(), uniqProcessed.size());
+
+ // Might lose tuples, but must not have send duplicates
+ uniq = new HashSet<>(tcSlow.getResult());
+ assertEquals(tcSlow.getResult().size(), uniq.size());
+ }
+
+ @Test
+ public void testPressureRelieverWithInitialDelay() throws Exception {
+
+ Topology topology = newTopology();
+
+
+ TStream<String> raw = topology.strings("A", "B", "C", "D", "E", "F", "G", "H");
+
+ TStream<String> pr = PlumbingStreams.pressureReliever(raw, v -> 0, 100);
+
+ TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 5, TimeUnit.SECONDS);
+
+ Condition<Long> tcCount = topology.getTester().tupleCount(pr2, 8);
+ Condition<List<String>> contents = topology.getTester().streamContents(pr2, "A", "B", "C", "D", "E", "F", "G", "H");
+ complete(topology, tcCount);
+
+ assertTrue(tcCount.valid());
+ assertTrue(contents.valid());
+ }
+
+ @Test
+ public void testValveState() throws Exception {
+ Valve<Integer> valve = new Valve<>();
+ assertTrue(valve.isOpen());
+
+ valve = new Valve<>(true);
+ assertTrue(valve.isOpen());
+
+ valve = new Valve<>(false);
+ assertFalse(valve.isOpen());
+
+ valve.setOpen(true);
+ assertTrue(valve.isOpen());
+
+ valve.setOpen(false);
+ assertFalse(valve.isOpen());
+ }
+
+ @Test
+ public void testValveInitiallyOpen() throws Exception {
+ Topology top = newTopology("testValve");
+
+ TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+ Valve<Integer> valve = new Valve<>();
+ AtomicInteger cnt = new AtomicInteger();
+ TStream<Integer> filtered = values
+ .peek(tuple -> {
+ // reject 4,5,6
+ int curCnt = cnt.incrementAndGet();
+ if (curCnt > 6)
+ valve.setOpen(true);
+ else if (curCnt > 3)
+ valve.setOpen(false);
+ })
+ .filter(valve);
+
+ Condition<Long> count = top.getTester().tupleCount(filtered, 7);
+ Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 1,2,3,7,8,9,10 );
+ complete(top, count);
+ assertTrue(contents.getResult().toString(), contents.valid());
+ }
+
+ @Test
+ public void testValveInitiallyClosed() throws Exception {
+ Topology top = newTopology("testValve");
+
+ TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+ Valve<Integer> valve = new Valve<>(false);
+
+ AtomicInteger cnt = new AtomicInteger();
+ TStream<Integer> filtered = values
+ .peek(tuple -> {
+ // reject all but 4,5,6
+ int curCnt = cnt.incrementAndGet();
+ if (curCnt > 6)
+ valve.setOpen(false);
+ else if (curCnt > 3)
+ valve.setOpen(true);
+ })
+ .filter(valve);
+
+ Condition<Long> count = top.getTester().tupleCount(filtered, 3);
+ Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 4,5,6 );
+ complete(top, count);
+ assertTrue(contents.getResult().toString(), contents.valid());
+ }
+
+ private Function<Integer,JsonObject> fakeAnalytic(int channel, long period, TimeUnit unit) {
+ return value -> {
+ try {
+ Thread.sleep(unit.toMillis(period));
+ JsonObject jo = new JsonObject();
+ jo.addProperty("channel", channel);
+ jo.addProperty("result", value);
+ return jo;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("channel="+channel+" interrupted", e);
+ }
+ };
+ }
+
+ private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
+ return stream -> stream.map(fakeAnalytic(channel, period, unit)).filter(t->true).tag("pipeline-ch"+channel);
+ }
+
+ @Test
+ public void testConcurrentMap() throws Exception {
+ Topology top = newTopology("testConcurrentMap");
+
+ int ch = 0;
+ List<Function<Integer,JsonObject>> mappers = new ArrayList<>();
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ // a couple much faster just in case something's amiss with queues
+ mappers.add(fakeAnalytic(ch++, 3, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 13, TimeUnit.MILLISECONDS));
+
+ Function<List<JsonObject>,Integer> combiner = list -> {
+ int sum = 0;
+ int cnt = 0;
+ System.out.println("combiner: "+list);
+ for(JsonObject jo : list) {
+ assertEquals(cnt++, jo.get("channel").getAsInt());
+ sum += jo.get("result").getAsInt();
+ }
+ return sum;
+ };
+
+ TStream<Integer> values = top.of(1, 2, 3);
+ Integer[] resultTuples = new Integer[]{
+ 1*mappers.size(),
+ 2*mappers.size(),
+ 3*mappers.size(),
+ };
+
+ TStream<Integer> result = PlumbingStreams.concurrentMap(values, mappers, combiner);
+
+ Condition<Long> count = top.getTester().tupleCount(result, 3);
+ Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+
+ long begin = System.currentTimeMillis();
+ complete(top, count);
+ long end = System.currentTimeMillis();
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+
+ long actDuration = end - begin;
+ long expMinSerialDuration = resultTuples.length * mappers.size() * 100;
+ long expMinDuration = resultTuples.length * 100;
+
+ System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+
+ // a gross level performance check w/concurrent channels
+ if (Boolean.getBoolean("edgent.build.ci"))
+ System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
+ else
+ assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+ actDuration < 0.5 * expMinSerialDuration);
+ }
+
+ @Test
+ public void testConcurrent() throws Exception {
+ Topology top = newTopology("testConcurrent");
+
+ int ch = 0;
+ List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>();
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+
+ Function<List<JsonObject>,Integer> combiner = list -> {
+ int sum = 0;
+ int cnt = 0;
+ System.out.println("combiner: "+list);
+ for(JsonObject jo : list) {
+ assertEquals(cnt++, jo.get("channel").getAsInt());
+ sum += jo.get("result").getAsInt();
+ }
+ return sum;
+ };
+
+ TStream<Integer> values = top.of(1, 2, 3);
+ Integer[] resultTuples = new Integer[]{
+ 1*pipelines.size(),
+ 2*pipelines.size(),
+ 3*pipelines.size(),
+ };
+
+ TStream<Integer> result = PlumbingStreams.concurrent(values, pipelines, combiner).tag("result");
+
+ Condition<Long> count = top.getTester().tupleCount(result, 3);
+ Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+
+ long begin = System.currentTimeMillis();
+ complete(top, count);
+ long end = System.currentTimeMillis();
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+
+ long actDuration = end - begin;
+ long expMinSerialDuration = resultTuples.length * pipelines.size() * 100;
+ long expMinDuration = resultTuples.length * 100;
+
+ System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+
+ // a gross level performance check w/concurrent channels
+ if (Boolean.getBoolean("edgent.build.ci"))
+ System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
+ else
+ assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+ actDuration < 0.5 * expMinSerialDuration);
+ }
+
+ private BiFunction<Integer,Integer,JsonObject> fakeParallelAnalytic(long period, TimeUnit unit) {
+ return (value,channel) -> {
+ try {
+ Thread.sleep(unit.toMillis(period)); // simulate work for this period
+ JsonObject jo = new JsonObject();
+ jo.addProperty("channel", channel);
+ jo.addProperty("result", value);
+ return jo;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("channel="+channel+" interrupted", e);
+ }
+ };
+ }
+
+ private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
+ return (stream,channel) -> stream
+ .map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))
+ .filter(t->true)
+ .tag("pipeline-ch"+channel);
+ }
+
+ private Function<JsonObject,JsonObject> fakeJsonAnalytic(int channel, long period, TimeUnit unit) {
+ return jo -> {
+ try {
+ Thread.sleep(unit.toMillis(period)); // simulate work for this period
+ return jo;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("channel="+channel+" interrupted", e);
+ }
+ };
+ }
+
+ @SuppressWarnings("unused")
+ private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
+ return (stream,channel) -> stream
+ .map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
+ return jo; })
+ .map(fakeJsonAnalytic(channel, period, unit))
+ .filter(t->true)
+ .map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
+ return jo; })
+ .tag("pipeline-ch"+channel);
+ }
+
+ @Test
+ public void testParallelMap() throws Exception {
+ Topology top = newTopology("testParallelMap");
+
+ BiFunction<Integer,Integer,JsonObject> mapper =
+ fakeParallelAnalytic(100, TimeUnit.MILLISECONDS);
+
+ int width = 5;
+ ToIntFunction<Integer> splitter = tuple -> tuple % width;
+
+ Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+ TStream<Integer> values = top.of(resultTuples);
+
+ TStream<JsonObject> result = PlumbingStreams.parallelMap(values, width, splitter, mapper).tag("result");
+ TStream<Integer> result2 = result.map(jo -> {
+ int r = jo.get("result").getAsInt();
+ assertEquals(splitter.applyAsInt(r), jo.get("channel").getAsInt());
+ return r;
+ });
+
+ Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+ Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+
+ long begin = System.currentTimeMillis();
+ complete(top, count);
+ long end = System.currentTimeMillis();
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+
+ long actDuration = end - begin;
+ long expMinSerialDuration = resultTuples.length * 100;
+ long expMinDuration = (resultTuples.length / width) * 100;
+
+ System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+
+ // a gross level performance check w/parallel channels
+ if (Boolean.getBoolean("edgent.build.ci"))
+ System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
+ else
+ assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+ actDuration < 0.5 * expMinSerialDuration);
+ }
+
+ @Test
+ public void testParallel() throws Exception {
+ Topology top = newTopology("testParallel");
+
+ BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline =
+ fakeParallelPipeline(100, TimeUnit.MILLISECONDS);
+
+ int width = 5;
+ ToIntFunction<Integer> splitter = tuple -> tuple % width;
+
+ Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+ TStream<Integer> values = top.of(resultTuples);
+
+ TStream<JsonObject> result = PlumbingStreams.parallel(values, width, splitter, pipeline).tag("result");
+ TStream<Integer> result2 = result.map(jo -> {
+ int r = jo.get("result").getAsInt();
+ assertEquals(splitter.applyAsInt(r), jo.get("channel").getAsInt());
+ return r;
+ });
+
+ Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+ Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+
+ long begin = System.currentTimeMillis();
+ complete(top, count);
+ long end = System.currentTimeMillis();
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+
+ long actDuration = end - begin;
+ long expMinSerialDuration = resultTuples.length * 100;
+ long expMinDuration = (resultTuples.length / width) * 100;
+
+ System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+
+ // a gross level performance check w/parallel channels
+ if (Boolean.getBoolean("edgent.build.ci"))
+ System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
+ else
+ assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+ actDuration < 0.5 * expMinSerialDuration);
+ }
+
+ @Test
+ public void testParallelBalanced() throws Exception {
+ // May need tweak validation sensitivity or add this:
+ // Timing variances on shared machines can cause this test to fail
+ // assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+ Topology top = newTopology("testParallelBalanced");
+
+ // arrange for even channels to process ~2x as many as odd channels.
+ BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline =
+ (stream,ch) -> {
+ long delay = (ch % 2 == 0) ? 10 : 20;
+ return stream.map(fakeAnalytic(ch, delay, TimeUnit.MILLISECONDS));
+ };
+
+ int width = 4;
+ int tupCnt = 60;
+ Integer[] resultTuples = new Integer[tupCnt];
+ for (int i = 0; i < tupCnt; i++)
+ resultTuples[i] = i;
+ AtomicInteger[] chCounts = new AtomicInteger[width];
+ for (int ch = 0; ch < width; ch++)
+ chCounts[ch] = new AtomicInteger();
+
+ TStream<Integer> values = top.of(resultTuples);
+
+ TStream<JsonObject> result = PlumbingStreams.parallelBalanced(values, width, pipeline).tag("result");
+ TStream<Integer> result2 = result.map(jo -> {
+ int r = jo.get("result").getAsInt();
+ int ch = jo.get("channel").getAsInt();
+ chCounts[ch].incrementAndGet();
+ return r;
+ });
+
+ Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+ Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+
+ long begin = System.currentTimeMillis();
+ complete(top, count);
+ long end = System.currentTimeMillis();
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+
+ long actDuration = end - begin;
+ long expMinSerialDuration = resultTuples.length * 20;
+ long expMinDuration = (resultTuples.length / width) * 20;
+
+ System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+ System.out.println(top.getName()+" chCounts="+Arrays.asList(chCounts));
+
+ // a gross level performance check w/parallel channels
+ if (Boolean.getBoolean("edgent.build.ci"))
+ System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
+ else
+ assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+ actDuration < 0.5 * expMinSerialDuration);
+
+ int evenChCounts = 0;
+ int oddChCounts = 0;
+ for (int ch = 0; ch < width; ch++) {
+ assertTrue(chCounts[ch].get() != 0);
+ if (ch % 2 == 0)
+ evenChCounts += chCounts[ch].get();
+ else
+ oddChCounts += chCounts[ch].get();
+ }
+ assertTrue(oddChCounts > 0.4 * evenChCounts
+ && oddChCounts < 0.6 * evenChCounts);
+ }
+
+// @Test
+// public void testParallelTiming() throws Exception {
+// Topology top = newTopology("testParallelTiming");
+//
+// BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> pipeline =
+// fakeParallelPipelineTiming(100, TimeUnit.MILLISECONDS);
+//
+// int width = 5;
+// // ToIntFunction<Integer> splitter = tuple -> tuple % width;
+// ToIntFunction<JsonObject> splitter = jo -> jo.get("result").getAsInt() % width;
+//
+// Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+// TStream<Integer> values = top.of(resultTuples);
+//
+// TStream<JsonObject> inStream = values.map(value -> {
+// JsonObject jo = new JsonObject();
+// jo.addProperty("result", value);
+// jo.addProperty("channel", splitter.applyAsInt(jo));
+// jo.addProperty("enterParallelMsec", System.currentTimeMillis());
+// return jo;
+// });
+// TStream<JsonObject> result = PlumbingStreams.parallel(inStream, width, splitter, pipeline).tag("result");
+// TStream<Integer> result2 = result.map(jo -> {
+// jo.addProperty("exitParallelMsec", System.currentTimeMillis());
+// System.out.println("ch="+jo.get("channel").getAsInt()
+// +" endPipeline-startPipeline="
+// +(jo.get("endPipelineMsec").getAsLong()
+// - jo.get("startPipelineMsec").getAsLong())
+// +" exitParallel-startPipeine="
+// +(jo.get("exitParallelMsec").getAsLong()
+// - jo.get("startPipelineMsec").getAsLong()));
+// int r = jo.get("result").getAsInt();
+// assertEquals(splitter.applyAsInt(jo), jo.get("channel").getAsInt());
+// return r;
+// });
+//
+// Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+// Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+// long begin = System.currentTimeMillis();
+// complete(top, count);
+// long end = System.currentTimeMillis();
+// assertTrue(contents.getResult().toString(), contents.valid());
+//
+// long actDuration = end - begin;
+//
+// long expMinSerialDuration = resultTuples.length * 100;
+// long expMinDuration = (resultTuples.length / width) * 100;
+//
+// System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+//
+// // a gross level performance check w/parallel channels
+// assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+// actDuration < 0.5 * expMinSerialDuration);
+// }
+
+ @Test
+ public void testGate() throws Exception {
+ Topology topology = newTopology("testGate");
+
+ TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+ Semaphore semaphore = new Semaphore(1);
+ raw = PlumbingStreams.gate(raw, semaphore);
+
+ ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+ ArrayList<Integer> arrayResult = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ arrayResult.add(0);
+ arrayResult.add(1);
+ }
+
+ raw.sink(t -> {
+ //Add 0 to list because semaphore.acquire() in sync has occurred
+ resultAvailablePermits.add(semaphore.availablePermits());
+ semaphore.release();
+ //Add 1 to list because semaphore.release() has executed
+ resultAvailablePermits.add(semaphore.availablePermits());
+ });
+
+ Condition<List<String>> contents = topology.getTester()
+ .streamContents(raw, "a", "b", "c", "d", "e");
+ complete(topology, contents);
+
+ assertTrue("valid:" + contents.getResult(), contents.valid());
+ assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+ }
+
+ @Test
+ public void testGateWithLocking() throws Exception {
+ Topology topology = newTopology("testGateWithLocking");
+
+ TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+ Semaphore semaphore = new Semaphore(3);
+ raw = PlumbingStreams.gate(raw, semaphore);
+
+ ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+ ArrayList<Integer> arrayResult = new ArrayList<>();
+ arrayResult.add(2);
+ arrayResult.add(1);
+ arrayResult.add(0);
+
+ raw.sink(t -> {
+ //Add number of availablePermits
+ resultAvailablePermits.add(semaphore.availablePermits());
+ });
+
+ Condition<List<String>> contents = topology.getTester().streamContents(raw, "a", "b", "c");
+ complete(topology, contents, 1000, TimeUnit.MILLISECONDS);
+
+ assertTrue("valid:" + contents.getResult(), contents.valid());
+ assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java
new file mode 100644
index 0000000..7625667
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java
@@ -0,0 +1,875 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TWindow;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public abstract class TStreamTest extends TopologyAbstractTest {
+
+ @Test
+ public void testAlias() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a", "b");
+ assertEquals(null, s.getAlias());
+
+ TStream<String> s2 = s.alias("sAlias");
+ assertSame(s, s2);
+ assertEquals("sAlias", s.getAlias());
+
+ try {
+ s.alias("another"); // expect ISE - alias already set
+ assertTrue(false);
+ } catch (IllegalStateException e) {
+ ; // expected
+ }
+
+ // test access at runtime
+ s2 = s.peek(tuple -> {
+ assertEquals("sAlias", s.getAlias());
+ }).filter(tuple -> true);
+
+ // just verify that alias presence doesn't otherwise muck up things
+ Condition<Long> tc = t.getTester().tupleCount(s2, 2);
+ Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
+ complete(t, tc);
+
+ assertTrue("contents "+contents.getResult(), contents.valid());
+ }
+
+ @Test
+ public void testTag() throws Exception {
+
+ Topology t = newTopology();
+
+ List<String> tags = new ArrayList<>(Arrays.asList("tag1", "tag2"));
+
+ TStream<String> s = t.strings("a", "b");
+ assertEquals(0, s.getTags().size());
+
+ TStream<String> s2 = s.tag("tag1", "tag2");
+ assertSame(s, s2);
+ assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
+
+ tags.add("tag3");
+ s.tag("tag3");
+ assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
+
+ s.tag("tag3", "tag2", "tag1"); // ok to redundantly add
+ assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
+
+ // test access at runtime
+ s2 = s.peek(tuple -> {
+ assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
+ }).filter(tuple -> true);
+
+ // just verify that tag presence doesn't otherwise muck up things
+ Condition<Long> tc = t.getTester().tupleCount(s2, 2);
+ Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
+ complete(t, tc);
+
+ assertTrue("contents "+contents.getResult(), contents.valid());
+ }
+
+ @Test
+ public void testFilter() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a", "b", "c");
+ s = s.filter(tuple -> "b".equals(tuple));
+ assertStream(t, s);
+
+ Condition<Long> tc = t.getTester().tupleCount(s, 1);
+ Condition<List<String>> contents = t.getTester().streamContents(s, "b");
+ complete(t, tc);
+
+ assertTrue(contents.valid());
+ }
+
+ /**
+ * Test Peek. This will only work with an embedded setup.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPeek() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a", "b", "c");
+ List<String> peekedValues = new ArrayList<>();
+ TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple));
+ assertSame(s, speek);
+
+ Condition<Long> tc = t.getTester().tupleCount(s, 3);
+ Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
+ complete(t, tc);
+
+ assertTrue(contents.valid());
+ assertEquals(contents.getResult(), peekedValues);
+ }
+
+ @Test
+ public void testMultiplePeek() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a", "b", "c");
+ List<String> peekedValues = new ArrayList<>();
+ TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st"));
+ assertSame(s, speek);
+
+ TStream<String> speek2 = s.peek(tuple -> peekedValues.add(tuple + "2nd"));
+ assertSame(s, speek2);
+ TStream<String> speek3 = s.peek(tuple -> peekedValues.add(tuple + "3rd"));
+ assertSame(s, speek3);
+
+ Condition<Long> tc = t.getTester().tupleCount(s, 3);
+ Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
+ complete(t, tc);
+
+ assertTrue(contents.valid());
+ List<String> expected = Arrays.asList("a1st", "a2nd", "a3rd", "b1st", "b2nd", "b3rd", "c1st", "c2nd", "c3rd");
+ assertEquals(expected, peekedValues);
+ }
+
+ @Test
+ public void testMap() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("32", "423", "-746");
+ TStream<Integer> i = s.map(Integer::valueOf);
+ assertStream(t, i);
+
+ Condition<Long> tc = t.getTester().tupleCount(i, 3);
+ Condition<List<Integer>> contents = t.getTester().streamContents(i, 32, 423, -746);
+ complete(t, tc);
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+ }
+
+ @Test
+ public void testModifyWithDrops() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("32", "423", "-746");
+ TStream<Integer> i = s.map(Integer::valueOf);
+ i = i.modify(tuple -> tuple < 0 ? null : tuple + 27);
+ assertStream(t, i);
+
+ Condition<Long> tc = t.getTester().tupleCount(i, 2);
+ Condition<List<Integer>> contents = t.getTester().streamContents(i, 59, 450);
+ complete(t, tc);
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+ }
+
+ @Test
+ public void testModify() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a", "b", "c");
+ TStream<String> i = s.modify(tuple -> tuple.concat("M"));
+ assertStream(t, i);
+
+ Condition<Long> tc = t.getTester().tupleCount(i, 3);
+ Condition<List<String>> contents = t.getTester().streamContents(i, "aM", "bM", "cM");
+ complete(t, tc);
+
+ assertTrue(contents.valid());
+ }
+
+ @Test
+ public void tesFlattMap() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("mary had a little lamb",
+ "its fleece was white as snow");
+ TStream<String> w = s.flatMap(tuple->Arrays.asList(tuple.split(" ")));
+ assertStream(t, w);
+
+ Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
+ "a", "little", "lamb", "its", "fleece", "was", "white", "as",
+ "snow");
+ complete(t, contents);
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+ }
+
+ @Test
+ public void tesFlattMapWithNullIterator() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("mary had a little lamb", "NOTUPLES",
+ "its fleece was white as snow");
+ TStream<String> w = s.flatMap(tuple->tuple.equals("NOTUPLES") ? null : Arrays.asList(tuple.split(" ")));
+ assertStream(t, w);
+
+ Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
+ "a", "little", "lamb", "its", "fleece", "was", "white", "as",
+ "snow");
+ complete(t, contents);
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+ }
+
+ @Test
+ public void tesFlattMapWithNullValues() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("mary had a little lamb",
+ "its fleece was white as snow");
+ TStream<String> w = s.flatMap(tuple-> {List<String> values = Arrays.asList(tuple.split(" "));
+ values.set(2, null); values.set(4, null); return values;});
+ assertStream(t, w);
+
+ Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
+ "little", "its", "fleece", "white",
+ "snow");
+ complete(t, contents);
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+ }
+
+ /**
+ * Test split() with no drops.
+ * @throws Exception on failure
+ */
+ @Test
+ public void testSplit() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a1", "b1", "a2", "c1", "e1", "c2", "c3", "b2", "a3", "b3", "d1", "e2");
+ List<TStream<String>> splits = s.split(3, tuple -> tuple.charAt(0) - 'a');
+
+ Condition<Long> tc0 = t.getTester().tupleCount(splits.get(0), 4);
+ Condition<Long> tc1 = t.getTester().tupleCount(splits.get(1), 5);
+ Condition<Long> tc2 = t.getTester().tupleCount(splits.get(2), 3);
+ Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(0), "a1", "a2", "a3", "d1");
+ Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(1), "b1", "e1", "b2", "b3", "e2");
+ Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(2), "c1", "c2", "c3");
+
+ complete(t, t.getTester().and(tc0, tc1, tc2));
+
+ assertTrue(contents0.toString(), contents0.valid());
+ assertTrue(contents1.toString(), contents1.valid());
+ assertTrue(contents2.toString(), contents2.valid());
+ }
+
+ /**
+ * Test split() with drops.
+ * @throws Exception on failure
+ */
+ @Test
+ public void testSplitWithDrops() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a1", "b1", "a2", "c1", "e1", "c2", "c3", "b2", "a3", "b3", "d1", "e2");
+ List<TStream<String>> splits = s.split(3, tuple -> {
+ switch (tuple.charAt(0)) {
+ case 'a':
+ return 1;
+ case 'b':
+ return 4;
+ case 'c':
+ return 8;
+ case 'd':
+ return -34;
+ case 'e':
+ return -1;
+ default:
+ return -1;
+ }
+ });
+
+ Condition<Long> tc0 = t.getTester().tupleCount(splits.get(0), 0);
+ Condition<Long> tc1 = t.getTester().tupleCount(splits.get(1), 6);
+ Condition<Long> tc2 = t.getTester().tupleCount(splits.get(2), 3);
+ Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(0));
+ Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(1), "a1", "b1", "a2", "b2", "a3",
+ "b3");
+ Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(2), "c1", "c2", "c3");
+
+ complete(t, t.getTester().and(tc0, tc1, tc2));
+
+ assertTrue(contents0.toString(), contents0.valid());
+ assertTrue(contents1.toString(), contents1.valid());
+ assertTrue(contents2.toString(), contents2.valid());
+ }
+
+ /**
+ * Test split() zero outputs
+ * @throws Exception on failure
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testSplitWithZeroOutputs() throws Exception {
+ newTopology().strings("a1").split(0, tuple -> 0);
+ }
+
+ /**
+ * Test split() negative outputs
+ * @throws Exception on failure
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testSplitWithNegativeOutputs() throws Exception {
+ newTopology().strings("a1").split(-28, tuple -> 0);
+ }
+
+ /**
+ * Test enum data structure
+ */
+ private enum LogSeverityEnum {
+ ALERT(1), CRITICAL(2), ERROR(3), WARNING(4), NOTICE(5), INFO(6), DEBUG(7);
+
+ @SuppressWarnings("unused")
+ private final int code;
+
+ LogSeverityEnum(final int code) {
+ this.code = code;
+ }
+ }
+
+ /**
+ * Test split(enum) with integer type enum.
+ * @throws Exception on failure
+ */
+ @Test
+ public void testSplitWithEnum() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("Log1_ALERT", "Log2_INFO", "Log3_INFO", "Log4_INFO", "Log5_ERROR", "Log6_ERROR", "Log7_CRITICAL");
+ TStream<String> i = s.map(String::toString);
+ EnumMap<LogSeverityEnum,TStream<String>> splits = i.split(LogSeverityEnum.class, e -> LogSeverityEnum.valueOf(e.split("_")[1]));
+
+ assertStream(t, i);
+
+ Condition<Long> tc0 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ALERT), 1);
+ Condition<Long> tc1 = t.getTester().tupleCount(splits.get(LogSeverityEnum.INFO), 3);
+ Condition<Long> tc2 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ERROR), 2);
+ Condition<Long> tc3 = t.getTester().tupleCount(splits.get(LogSeverityEnum.CRITICAL), 1);
+ Condition<Long> tc4 = t.getTester().tupleCount(splits.get(LogSeverityEnum.WARNING), 0);
+
+ Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(LogSeverityEnum.ALERT), "Log1_ALERT");
+ Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(LogSeverityEnum.INFO), "Log2_INFO",
+ "Log3_INFO", "Log4_INFO");
+ Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(LogSeverityEnum.ERROR), "Log5_ERROR",
+ "Log6_ERROR");
+ Condition<List<String>> contents3 = t.getTester().streamContents(splits.get(LogSeverityEnum.CRITICAL), "Log7_CRITICAL");
+ Condition<List<String>> contents4 = t.getTester().streamContents(splits.get(LogSeverityEnum.WARNING));
+
+ complete(t, t.getTester().and(tc0, tc1, tc2, tc3, tc4));
+
+
+ assertTrue(contents0.toString(), contents0.valid());
+ assertTrue(contents1.toString(), contents1.valid());
+ assertTrue(contents2.toString(), contents2.valid());
+ assertTrue(contents3.toString(), contents3.valid());
+ assertTrue(contents4.toString(), contents4.valid());
+ }
+
+ private enum EnumClassWithZerosize {
+ ;
+ }
+
+ /**
+ * Test split(enum) with integer type enum.
+ * @throws Exception on failure
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testSplitWithEnumForZeroSizeClass() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("Test");
+ s.split(EnumClassWithZerosize.class, e -> EnumClassWithZerosize.valueOf("Test"));
+ }
+
+ @Test
+ public void testFanout2() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a", "b", "c");
+ TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
+ TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
+
+ Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
+ Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
+ Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cfo2");
+
+ complete(t, t.getTester().and(tsm, tsmc));
+
+ assertTrue(tsf.getResult().toString(), tsf.valid());
+ assertTrue(tsm.getResult().toString(), tsm.valid());
+ }
+ @Test
+ public void testFanout3() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a", "b", "cde");
+ TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
+ TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
+ TStream<byte[]> st = s.map(tuple -> tuple.getBytes());
+
+ Condition<Long> tsfc = t.getTester().tupleCount(sf, 1);
+ Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
+ Condition<Long> tstc = t.getTester().tupleCount(st, 3);
+ Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
+ Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cdefo2");
+ Condition<List<byte[]>> tst = t.getTester().streamContents(st, "a".getBytes(), "b".getBytes(), "cde".getBytes());
+
+ complete(t, t.getTester().and(tsfc, tsmc, tstc));
+
+ assertTrue(tsf.valid());
+ assertTrue(tsm.valid());
+
+ // Can't use equals on byte[]
+ assertEquals(3, tst.getResult().size());
+ assertEquals("a", new String(tst.getResult().get(0)));
+ assertEquals("b", new String(tst.getResult().get(1)));
+ assertEquals("cde", new String(tst.getResult().get(2)));
+ }
+
+ @Test
+ public void testPeekThenFanout() throws Exception {
+ _testFanoutWithPeek(1, 0, 0);
+ }
+
+ @Test
+ public void testFanoutThenPeek() throws Exception {
+ _testFanoutWithPeek(0, 0, 1);
+ }
+
+ @Test
+ public void testPeekMiddleFanout() throws Exception {
+ _testFanoutWithPeek(0, 1, 0);
+ }
+
+ @Test
+ public void testMultiPeekFanout() throws Exception {
+ _testFanoutWithPeek(3, 3, 3);
+ }
+
+ void _testFanoutWithPeek(int numBefore, int numMiddle, int numAfter) throws Exception {
+
+ Topology t = newTopology();
+
+ List<Peeked> values = new ArrayList<>();
+ values.add(new Peeked(33));
+ values.add(new Peeked(-214));
+ values.add(new Peeked(9234));
+ for (Peeked p : values)
+ assertEquals(0, p.peekedCnt);
+
+ TStream<Peeked> s = t.collection(values);
+ if (numBefore > 0) {
+ for (int i = 0; i < numBefore; i++)
+ s.peek(tuple -> tuple.peekedCnt++);
+ }
+
+ TStream<Peeked> sf = s.filter(tuple -> tuple.value > 0);
+ if (numMiddle > 0) {
+ for (int i = 0; i < numMiddle; i++)
+ s.peek(tuple -> tuple.peekedCnt++);
+ }
+ TStream<Peeked> sm = s.modify(tuple -> new Peeked(tuple.value + 37, tuple.peekedCnt));
+
+ if (numAfter > 0) {
+ for (int i = 0; i < numAfter; i++)
+ s.peek(tuple -> tuple.peekedCnt++);
+ }
+
+ int totPeeks = numBefore + numMiddle + numAfter;
+ Condition<Long> tsfc = t.getTester().tupleCount(sf, 2);
+ Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
+ Condition<List<Peeked>> tsf = t.getTester().streamContents(sf, new Peeked(33, totPeeks), new Peeked(9234, totPeeks));
+ Condition<List<Peeked>> tsm = t.getTester().streamContents(sm, new Peeked(70, totPeeks), new Peeked(-177, totPeeks),
+ new Peeked(9271, totPeeks));
+
+ complete(t, t.getTester().and(tsfc, tsmc));
+
+ assertTrue(tsf.getResult().toString(), tsf.valid());
+ assertTrue(tsm.getResult().toString(), tsm.valid());
+ }
+
+ public static class Peeked implements Serializable {
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + peekedCnt;
+ result = prime * result + value;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Peeked other = (Peeked) obj;
+ if (peekedCnt != other.peekedCnt)
+ return false;
+ if (value != other.value)
+ return false;
+ return true;
+ }
+
+ private static final long serialVersionUID = 1L;
+ final int value;
+ int peekedCnt;
+
+ Peeked(int value) {
+ this.value = value;
+ }
+
+ Peeked(int value, boolean peeked) {
+ this(value, 1);
+ }
+
+ Peeked(int value, int peekedCnt) {
+ this.value = value;
+ // this.peeked = true;
+ this.peekedCnt = peekedCnt;
+ }
+
+ public String toString() {
+ return "{" + "value=" + value + " peekedCnt=" + peekedCnt + "}";
+ }
+ }
+
+ /**
+ * Test Union with itself.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testUnionWithSelf() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a", "b", "c");
+
+ assertSame(s, s.union(s));
+ assertSame(s, s.union(Collections.emptySet()));
+ assertSame(s, s.union(Collections.singleton(s)));
+ }
+
+ @Test
+ public void testUnion2() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s1 = t.strings("a", "b", "c");
+ TStream<String> s2 = t.strings("d", "e");
+ TStream<String> su = s1.union(s2);
+ assertNotSame(s1, su);
+ assertNotSame(s2, su);
+ TStream<String> r = su.modify(v -> v.concat("X"));
+
+ Condition<Long> tc = t.getTester().tupleCount(r, 5);
+ Condition<List<String>> contents = t.getTester().contentsUnordered(r,
+ "aX", "bX", "cX", "dX", "eX");
+ complete(t, tc);
+
+ assertTrue(tc.getResult().toString(), tc.valid());
+ assertTrue(contents.getResult().toString(), contents.valid());
+ }
+
+ @Test
+ public void testUnion4() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s1 = t.strings("a", "b", "c");
+ TStream<String> s2 = t.strings("d", "e");
+ TStream<String> s3 = t.strings("f", "g", "h", "i");
+ TStream<String> s4 = t.strings("j");
+ TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s2, s3, s4)));
+ assertNotSame(s1, su);
+ assertNotSame(s2, su);
+ assertNotSame(s3, su);
+ assertNotSame(s4, su);
+ TStream<String> r = su.modify(v -> v.concat("Y"));
+
+ Condition<Long> tc = t.getTester().tupleCount(r, 10);
+ Condition<List<String>> contents = t.getTester().contentsUnordered(r,
+ "aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
+ complete(t, tc);
+
+ assertTrue(tc.getResult().toString(), tc.valid());
+ assertTrue(contents.getResult().toString(), contents.valid());
+ }
+
+ @Test
+ public void testUnion4WithSelf() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s1 = t.strings("a", "b", "c");
+ TStream<String> s2 = t.strings("d", "e");
+ TStream<String> s3 = t.strings("f", "g", "h", "i");
+ TStream<String> s4 = t.strings("j");
+ TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s1, s2, s3, s4)));
+ assertNotSame(s1, su);
+ assertNotSame(s2, su);
+ assertNotSame(s3, su);
+ assertNotSame(s4, su);
+ TStream<String> r = su.modify(v -> v.concat("Y"));
+
+ Condition<Long> tc = t.getTester().tupleCount(r, 10);
+ Condition<List<String>> contents = t.getTester().contentsUnordered(r,
+ "aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
+ complete(t, tc);
+
+ assertTrue(tc.getResult().toString(), tc.valid());
+ assertTrue(contents.getResult().toString(), contents.valid());
+ }
+
+ @Test
+ public void testSink() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a", "b", "c");
+
+ List<String> sinked = new ArrayList<>();
+ TSink<String> terminal = s.sink(tuple -> sinked.add(tuple));
+ assertNotNull(terminal);
+ assertSame(t, terminal.topology());
+ assertSame(s, terminal.getFeed());
+ TStream<String> s1 = s.filter(tuple -> true);
+
+ Condition<Long> tc = t.getTester().tupleCount(s1, 3);
+ complete(t, tc);
+
+ assertEquals("a", sinked.get(0));
+ assertEquals("b", sinked.get(1));
+ assertEquals("c", sinked.get(2));
+ }
+
+ /**
+ * Submit multiple jobs concurrently using ProcessSource.
+ * @throws Exception on failure
+ */
+ @Test
+ public void testMultiTopology() throws Exception {
+
+ int executions = 4;
+ ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
+ Executors.newFixedThreadPool(executions));
+ for (int i = 0; i < executions; i++) {
+ completer.submit(() -> {
+ Topology t = newTopology();
+ TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h");
+ s.sink((tuple) -> { if ("h".equals(tuple)) System.out.println(tuple);});
+ Condition<Long> tc = t.getTester().tupleCount(s, 8);
+ complete(t, tc);
+ return true;
+ });
+ }
+ waitForCompletion(completer, executions);
+ }
+
+ /**
+ * Submit multiple jobs concurrently using ProcessSource.
+ * @throws Exception on failure
+ */
+ @Test
+ public void testMultiTopologyWithError() throws Exception {
+
+ int executions = 4;
+ ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
+ Executors.newFixedThreadPool(executions));
+ for (int i = 0; i < executions; i++) {
+ completer.submit(() -> {
+ Topology t = newTopology();
+ TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h");
+ // Throw on the 8th tuple
+ s.sink((tuple) -> { if ("h".equals(tuple)) throw new RuntimeException("Expected Test Exception");});
+ // Expect 7 tuples out of 8
+ Condition<Long> tc = t.getTester().tupleCount(s, 7);
+ complete(t, tc);
+ return true;
+ });
+ }
+ waitForCompletion(completer, executions);
+ }
+
+ /**
+ * Submit multiple jobs concurrently using PeriodicSource.
+ * @throws Exception on failure
+ */
+ @Test
+ public void testMultiTopologyPollWithError() throws Exception {
+
+ int executions = 4;
+ ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
+ Executors.newFixedThreadPool(executions));
+ for (int i = 0; i < executions; i++) {
+ completer.submit(() -> {
+ Topology t = newTopology();
+ AtomicLong n = new AtomicLong(0);
+ TStream<Long> s = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
+ // Throw on the 8th tuple
+ s.sink((tuple) -> { if (8 == n.get()) throw new RuntimeException("Expected Test Exception");});
+ // Expect 7 tuples out of 8
+ Condition<Long> tc = t.getTester().tupleCount(s, 7);
+ complete(t, tc);
+ return true;
+ });
+ }
+ waitForCompletion(completer, executions);
+ }
+
+ @Test
+ public void testJoinWithWindow() throws Exception{
+ Topology t = newTopology();
+
+ List<Integer> ints = new ArrayList<>();
+ List<Integer> lookupInts = new ArrayList<>();
+
+ // Ints to populate the window
+ for(int i = 0; i < 100; i++){
+ ints.add(i);
+ }
+
+ // Ints to lookup partitions in window
+ for(int i = 0; i < 10; i++){
+ lookupInts.add(i);
+ }
+ TStream<Integer> intStream = t.collection(ints);
+
+ // Wait until the window is populated, and then submit tuples
+ TStream<Integer> lookupIntStream = t.source(() -> {
+ try {
+ Thread.sleep(500);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return lookupInts;
+ });
+
+ TWindow<Integer, Integer> window = intStream.last(10, tuple -> tuple % 10);
+ TStream<Integer> joinsHappened = lookupIntStream.join(tuple -> tuple % 10, window, (number, partitionContents) -> {
+ assertTrue(partitionContents.size() == 10);
+ for(Integer element : partitionContents)
+ assertTrue(number % 10 == element % 10);
+
+ // Causes an error if two numbers map to the same partition, which shouldn't happen
+ partitionContents.clear();
+ return 0;
+ });
+
+ Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 10);
+ complete(t, tc);
+ }
+
+ @Test
+ public void testJoinLastWithKeyer() throws Exception{
+ Topology t = newTopology();
+
+ List<Integer> ints = new ArrayList<>();
+ for(int i = 0; i < 100; i++){
+ ints.add(i);
+ }
+
+ TStream<Integer> intStream = t.collection(ints);
+
+ // Wait until the window is populated, and then submit tuples
+ TStream<Integer> lookupIntStream = t.source(() -> {
+ try {
+ Thread.sleep(500);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return ints;
+ });
+
+ TStream<String> joinsHappened = lookupIntStream.joinLast(tuple -> tuple, intStream, tuple -> tuple, (a, b) -> {
+ assertTrue(a.equals(b));
+ return "0";
+ });
+
+ Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 100);
+ complete(t, tc);
+ }
+
+ private void waitForCompletion(ExecutorCompletionService<Boolean> completer, int numtasks) throws ExecutionException {
+ int remainingTasks = numtasks;
+ while (remainingTasks > 0) {
+ try {
+ Future<Boolean> completed = completer.poll(4, TimeUnit.SECONDS);
+ if (completed == null) {
+ System.err.println("Completer timed out");
+ throw new RuntimeException(new TimeoutException());
+ }
+ else {
+ completed.get();
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ remainingTasks--;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TWindowTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TWindowTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TWindowTest.java
new file mode 100644
index 0000000..ce66653
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TWindowTest.java
@@ -0,0 +1,174 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.apache.edgent.function.Functions.identity;
+import static org.apache.edgent.function.Functions.unpartitioned;
+import static org.apache.edgent.function.Functions.zero;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TWindow;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public abstract class TWindowTest extends TopologyAbstractTest{
+ @Test
+ public void testCountBasedBatch() throws Exception{
+ Topology top = newTopology();
+ List<Integer> intList = new ArrayList<>();
+ for(int i = 0; i < 1000;i++)
+ intList.add(i);
+ TStream<Integer> ints = top.source(() -> intList);
+
+ TWindow<Integer, Integer> window = ints.last(100, tuple -> 0);
+ TStream<Integer> sizes = window.batch((tuples, key) -> {
+ return tuples.size();
+ });
+ Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
+ 100,100,100,100,100,100,100,100,100,100);
+ complete(top, contents);
+ assertTrue(contents.valid());
+ }
+
+ @Test
+ public void testTimeBasedBatch() throws Exception{
+ Topology top = newTopology();
+ TStream<Integer> ints = top.poll(() -> {
+ return 1;
+ }, 10, TimeUnit.MILLISECONDS);
+
+ TWindow<Integer, Integer> window = ints.last(1000, TimeUnit.MILLISECONDS, tuple -> 0);
+ TStream<Integer> sizes = window.batch((tuples, key) -> {
+ return tuples.size();
+ });
+
+ Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
+ 100, 100, 100, 100, 100, 100, 100, 100, 100, 100);
+ complete(top, contents);
+ System.out.println(contents.getResult());
+ for(Integer size : contents.getResult()){
+ assertTrue(size >= 90 && size <= 110);
+ }
+ }
+
+ @Test
+ public void testKeyedWindowSum() throws Exception {
+ Topology t = newTopology();
+
+ TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4,4,3,4,4,3));
+ TWindow<Integer, Integer> window = integers.last(9, identity());
+ assertSame(identity(), window.getKeyFunction());
+ assertSame(t, window.topology());
+ assertSame(integers, window.feeder());
+
+ TStream<Integer> sums = window.aggregate((tuples, key) -> {
+ // All tuples in a partition are equal due to identity
+ assertEquals(1, new HashSet<>(tuples).size());
+ int sum = 0;
+ for(Integer tuple : tuples)
+ sum+=tuple;
+ return sum;
+ });
+
+ Condition<Long> tc = t.getTester().tupleCount(sums, 9);
+ Condition<List<Integer>> contents = t.getTester().streamContents(sums,
+ 1, 2, 3, 4, 8, 6, 12, 16, 9);
+ complete(t, tc);
+
+ assertTrue(contents.valid());
+ }
+
+ @Test
+ public void testWindowSum() throws Exception {
+ Topology t = newTopology();
+
+ TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4));
+ TWindow<Integer, Integer> window = integers.last(4, unpartitioned());
+ assertSame(unpartitioned(), window.getKeyFunction());
+ TStream<Integer> sums = window.aggregate((tuples, key) -> {
+ assertEquals(Integer.valueOf(0), key);
+ int sum = 0;
+ for(Integer tuple : tuples)
+ sum+=tuple;
+ return sum;
+ });
+
+ Condition<Long> tc = t.getTester().tupleCount(sums, 4);
+ Condition<List<Integer>> contents = t.getTester().streamContents(sums, 1, 3, 6, 10);
+ complete(t, tc);
+
+ assertTrue(contents.valid());
+ }
+
+ @Test
+ public void testTimeWindowTimeDiff() throws Exception {
+ // Timing variances on shared machines can cause this test to fail
+ assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+ Topology t = newTopology();
+
+ // Define data
+ ConcurrentLinkedQueue<Long> diffs = new ConcurrentLinkedQueue<>();
+
+ // Poll data
+ TStream<Long> times = t.poll(() -> {
+ return System.currentTimeMillis();
+ }, 1, TimeUnit.MILLISECONDS);
+
+ TWindow<Long, Integer> window = times.last(1, TimeUnit.SECONDS, unpartitioned());
+ assertSame(zero(), window.getKeyFunction());
+ TStream<Long> diffStream = window.aggregate((tuples, key) -> {
+ assertEquals(Integer.valueOf(0), key);
+ if(tuples.size() < 2){
+ return null;
+ }
+ return tuples.get(tuples.size() -1) - tuples.get(0);
+ });
+
+ diffStream.sink((tuple) -> diffs.add(tuple));
+
+ Condition<Long> tc = t.getTester().tupleCount(diffStream, 5000);
+ complete(t, tc);
+
+ for(Long diff : diffs){
+ assertTrue("Diff is: " + diff, diff >=0 && diff < 1060);
+ }
+
+ }
+
+ public static boolean withinTolerance(double expected, Double actual, double tolerance) {
+ double lowBound = (1.0 - tolerance) * expected;
+ double highBound = (1.0 + tolerance) * expected;
+ return (actual < highBound && actual > lowBound);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyAbstractTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyAbstractTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyAbstractTest.java
new file mode 100644
index 0000000..fd413a9
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyAbstractTest.java
@@ -0,0 +1,107 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Submitter;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyProvider;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Before;
+import org.junit.Ignore;
+
+import com.google.gson.JsonObject;
+
+@Ignore("Abstract class proiding generic topology testing.")
+public abstract class TopologyAbstractTest implements TopologyTestSetup {
+
+ private TopologyProvider topologyProvider;
+ private Submitter<Topology, Job> submitter;
+
+ @Before
+ public void setup() {
+ topologyProvider = createTopologyProvider();
+ assertNotNull(topologyProvider);
+
+ submitter = createSubmitter();
+ assertNotNull(submitter);
+ }
+
+ protected Submitter<Topology, ?> getSubmitter() {
+ return submitter;
+ }
+
+ @Override
+ public TopologyProvider getTopologyProvider() {
+ return topologyProvider;
+ }
+
+ protected Topology newTopology() {
+ return getTopologyProvider().newTopology();
+ }
+
+ protected Topology newTopology(String name) {
+ return getTopologyProvider().newTopology(name);
+ }
+
+ protected boolean complete(Topology topology, Condition<?> endCondition, long timeout, TimeUnit units) throws Exception {
+ return topology.getTester().complete(getSubmitter(), new JsonObject(), endCondition, timeout, units);
+ }
+
+ protected boolean complete(Topology topology, Condition<?> endCondition) throws Exception {
+ return complete(topology, endCondition, 10, TimeUnit.SECONDS);
+ }
+
+ public void completeAndValidate(String msg, Topology t,
+ TStream<String> s, int secTimeout, String... expected)
+ throws Exception {
+ completeAndValidate(true/*ordered*/, msg, t, s, secTimeout, expected);
+ }
+
+ public void completeAndValidate(boolean ordered, String msg, Topology t,
+ TStream<String> s, int secTimeout, String... expected)
+ throws Exception {
+
+ // if expected.length==0 we must run until the job completes or tmo
+ Condition<Long> tc = t.getTester().tupleCount(s,
+ expected.length == 0 ? Long.MAX_VALUE : expected.length);
+ Condition<List<String>> contents =
+ ordered ? t.getTester().streamContents(s, expected)
+ : t.getTester().contentsUnordered(s, expected);
+
+ complete(t, tc, secTimeout, TimeUnit.SECONDS);
+
+ assertTrue(msg + " contents:" + contents.getResult(), contents.valid());
+ if (expected.length != 0)
+ assertTrue("valid:" + tc.getResult(), tc.valid());
+ }
+
+ protected void assertStream(Topology t, TStream<?> s) {
+ assertSame(t, s.topology());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyElementTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyElementTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyElementTest.java
new file mode 100644
index 0000000..b66750c
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyElementTest.java
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyElement;
+import org.junit.Test;
+
+public class TopologyElementTest {
+
+ @Test
+ public void testHierachy() {
+ assertTrue(TopologyElement.class.isAssignableFrom(Topology.class));
+ assertTrue(TopologyElement.class.isAssignableFrom(TStream.class));
+ assertTrue(TopologyElement.class.isAssignableFrom(TSink.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTest.java
new file mode 100644
index 0000000..3fa3804
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTest.java
@@ -0,0 +1,160 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.mbeans.PeriodMXBean;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.execution.services.RuntimeServices;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("abstract, provides common tests for concrete implementations")
+public abstract class TopologyTest extends TopologyAbstractTest {
+
+ @Test
+ public void testBasics() {
+ final Topology t = newTopology("T123");
+ assertEquals("T123", t.getName());
+ assertSame(t, t.topology());
+ }
+
+ @Test
+ public void testDefaultName() {
+ final Topology t = newTopology();
+ assertSame(t, t.topology());
+ assertNotNull(t.getName());
+ }
+
+ @Test
+ public void testStringContants() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("a", "b", "c");
+ assertStream(t, s);
+
+ Condition<Long> tc = t.getTester().tupleCount(s, 3);
+ Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
+ complete(t, tc);
+ assertTrue(contents.valid());
+ }
+
+ @Test
+ public void testNoStringContants() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings();
+
+ Condition<Long> tc = t.getTester().tupleCount(s, 0);
+
+ complete(t, tc);
+
+ assertTrue(tc.valid());
+ }
+
+ @Test
+ public void testRuntimeServices() throws Exception {
+ Topology t = newTopology();
+ TStream<String> s = t.strings("A");
+
+ Supplier<RuntimeServices> serviceGetter =
+ t.getRuntimeServiceSupplier();
+
+ TStream<Boolean> b = s.map(tuple ->
+ serviceGetter.get().getService(ThreadFactory.class) != null
+ && serviceGetter.get().getService(ScheduledExecutorService.class) != null
+ );
+
+ Condition<List<Boolean>> tc = t.getTester().streamContents(b, Boolean.TRUE);
+ complete(t, tc);
+
+ assertTrue(tc.valid());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAdaptablePoll() throws Exception {
+ // Ensure the API supporting an adaptable poll() is functional.
+ Job job = null;
+ try {
+ Topology t = newTopology();
+ TStream<String> s = t.poll(() -> (new Date()).toString(),
+ 1, TimeUnit.HOURS)
+ .alias("myAlias");
+
+ AtomicInteger cnt = new AtomicInteger();
+ s.peek(tuple -> cnt.incrementAndGet());
+
+ Future<Job> jf = (Future<Job>) getSubmitter().submit(t);
+ job = jf.get();
+ assertEquals(Job.State.RUNNING, job.getCurrentState());
+
+ setPollFrequency(s, 100, TimeUnit.MILLISECONDS);
+ cnt.set(0);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+ int curCnt = cnt.get();
+ assertTrue("curCnt="+curCnt, curCnt >= 20);
+
+ setPollFrequency(s, 1, TimeUnit.SECONDS);
+ cnt.set(0);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+ curCnt = cnt.get();
+ assertTrue("curCnt="+curCnt, curCnt >= 2 && curCnt <= 4);
+
+ setPollFrequency(s, 100, TimeUnit.MILLISECONDS);
+ cnt.set(0);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+ curCnt = cnt.get();
+ assertTrue("curCnt="+curCnt, curCnt >= 20);
+ }
+ finally {
+ if (job != null)
+ job.stateChange(Job.Action.CLOSE);
+ }
+
+ }
+
+ static <T> void setPollFrequency(TStream<T> pollStream, long period, TimeUnit unit) {
+ ControlService cs = pollStream.topology().getRuntimeServiceSupplier()
+ .get().getService(ControlService.class);
+ PeriodMXBean control = cs.getControl(TStream.TYPE,
+ pollStream.getAlias(), PeriodMXBean.class);
+ control.setPeriod(period, unit);
+ }
+
+}