You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:38 UTC

[37/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/JsonFunctionsTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/JsonFunctionsTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/JsonFunctionsTest.java
new file mode 100644
index 0000000..a9ff459
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/JsonFunctionsTest.java
@@ -0,0 +1,78 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.edgent.function.Function;
+import org.apache.edgent.topology.json.JsonFunctions;
+import org.junit.Test;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+
+public class JsonFunctionsTest {
+    
+    private JsonObject newTestObject() {
+        // Just a mix of things so we have reasonable confidence
+        // the JsonFunctions are working.
+        JsonObject jo = new JsonObject();
+        jo.addProperty("boolean", true);
+        jo.addProperty("character", 'c');
+        jo.addProperty("short", (short)7);
+        jo.addProperty("int", 23);
+        jo.addProperty("long", 99L);
+        jo.addProperty("float", 3.0f);
+        jo.addProperty("double", 7.128d);
+        jo.addProperty("string", "a string value");
+        JsonArray ja = new JsonArray();
+        ja.add(new JsonPrimitive(123));
+        ja.add(new JsonPrimitive(456));
+        jo.add("array", ja);
+        JsonObject jo2 = new JsonObject();
+        jo2.addProperty("int", 789);
+        jo.add("object", jo2);
+        return jo;
+    }
+    
+    @Test
+    public void testStrings() {
+        JsonObject jo1 = newTestObject();
+        Function<JsonObject,String> asString = JsonFunctions.asString();
+        Function<String,JsonObject> fromString = JsonFunctions.fromString();
+        
+        String s1 = asString.apply(jo1);
+        JsonObject jo2 = fromString.apply(s1);
+        
+        assertEquals(jo2, jo1);
+    }
+    
+    @Test
+    public void testBytes() {
+        JsonObject jo1 = newTestObject();
+        Function<JsonObject,byte[]> asBytes = JsonFunctions.asBytes();
+        Function<byte[],JsonObject> fromBytes = JsonFunctions.fromBytes();
+        
+        byte[] b1 = asBytes.apply(jo1);
+        JsonObject jo2 = fromBytes.apply(b1);
+        
+        assertEquals(jo2, jo1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
new file mode 100644
index 0000000..e77330b
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java
@@ -0,0 +1,746 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Functions;
+import org.apache.edgent.function.ToIntFunction;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.apache.edgent.topology.plumbing.Valve;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+@Ignore
+public abstract class PlumbingTest extends TopologyAbstractTest {
+	
+
+	@Test
+    public void testBlockingDelay() throws Exception {
+		// Timing variances on shared machines can cause this test to fail
+		assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+        Topology topology = newTopology();
+        
+        TStream<String> strings = topology.strings("a", "b", "c", "d");
+        
+        TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
+        
+        // delay stream
+        starts = PlumbingStreams.blockingDelay(starts, 300, TimeUnit.MILLISECONDS);
+        
+        // calculate display
+        starts = starts.modify(v -> System.currentTimeMillis() - v);
+        
+        starts = starts.filter(v -> v >= 300);
+        
+        Condition<Long> tc = topology.getTester().tupleCount(starts, 4);
+        complete(topology, tc);
+        assertTrue("valid:" + tc.getResult(), tc.valid());
+    }
+
+    @Test
+    public void testBlockingThrottle() throws Exception {
+		// Timing variances on shared machines can cause this test to fail
+    	assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+        Topology topology = newTopology();
+        
+        TStream<String> strings = topology.strings("a", "b", "c", "d");
+
+        TStream<Long> emittedDelays = strings.map(v -> 0L);
+        
+        // throttle stream
+        long[] lastEmittedTimestamp = { 0 };
+        emittedDelays = PlumbingStreams.blockingThrottle(emittedDelays, 300, TimeUnit.MILLISECONDS)
+                .map(t -> {
+                    // compute the delay since the last emitted tuple
+                    long now = System.currentTimeMillis();
+                    if (lastEmittedTimestamp[0] == 0)
+                        lastEmittedTimestamp[0] = now;
+                    t = now - lastEmittedTimestamp[0];
+                    lastEmittedTimestamp[0] = now;
+                    // System.out.println("### "+t);
+                    return t;
+                    })
+                .map(t -> {
+                    // simulate 200ms downstream processing delay
+                    try {
+                        Thread.sleep(TimeUnit.MILLISECONDS.toMillis(200));
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                    } return t;
+                    }) ;
+
+        // should end up with throttled delays close to 300 (not 500 like
+        // a blockingDelay() under these same conditions would yield)
+        emittedDelays = emittedDelays.filter(v -> v <= 320);
+        
+        Condition<Long> tc = topology.getTester().tupleCount(emittedDelays, 4);
+        complete(topology, tc);
+        assertTrue("valid:" + tc.getResult(), tc.valid());
+    }
+
+    @Test
+    public void testOneShotDelay() throws Exception {
+
+        Topology topology = newTopology();
+        
+        TStream<String> strings = topology.strings("a", "b", "c", "d");
+        
+        TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
+        
+        // delay stream
+        starts = PlumbingStreams.blockingOneShotDelay(starts, 300, TimeUnit.MILLISECONDS);
+        
+        // calculate display
+        starts = starts.modify(v -> System.currentTimeMillis() - v);
+        
+        // the first tuple shouldn't satisfy the predicate
+        starts = starts.filter(v -> v < 300);
+        
+        Condition<Long> tc = topology.getTester().tupleCount(starts, 3);
+        complete(topology, tc);
+        assertTrue("valid:" + tc.getResult(), tc.valid());
+    }
+
+    public static class TimeAndId {
+    	private static AtomicInteger ids = new AtomicInteger();
+    	long ms;
+    	final int id;
+    	
+    	public TimeAndId() {
+    		this.ms = System.currentTimeMillis();
+    		this.id = ids.incrementAndGet();
+    	}
+    	public TimeAndId(TimeAndId tai) {
+    		this.ms = System.currentTimeMillis() - tai.ms;
+    		this.id = tai.id;
+    	}
+    	@Override
+    	public String toString() {
+    		return "TAI:" + id + "@" + ms;
+    	}
+    	
+    }
+    
+    @Test
+    public void testPressureReliever() throws Exception {
+		// Timing variances on shared machines can cause this test to fail
+		assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+        Topology topology = newTopology();
+        
+        TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);
+           
+        
+        TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 5);
+        
+        // insert a blocking delay acting as downstream operator that cannot keep up
+        TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS);
+        
+        // calculate the delay
+        TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));
+        
+        // Also process raw that should be unaffected by the slow path
+        TStream<String> processed = raw.asString();
+        
+        
+        Condition<Long> tcSlowCount = topology.getTester().atLeastTupleCount(slow, 20);
+        Condition<List<TimeAndId>> tcRaw = topology.getTester().streamContents(raw);
+        Condition<List<TimeAndId>> tcSlow = topology.getTester().streamContents(slow);
+        Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM);
+        Condition<List<String>> tcProcessed = topology.getTester().streamContents(processed);
+        complete(topology, tcSlowCount);
+        
+        assertTrue(tcProcessed.getResult().size() > tcSlowM.getResult().size());
+        for (TimeAndId delay : tcSlowM.getResult())
+            assertTrue(delay.ms < 300);
+
+        // Must not lose any tuples in the non relieving path
+        Set<TimeAndId> uniq = new HashSet<>(tcRaw.getResult());
+        assertEquals(tcRaw.getResult().size(), uniq.size());
+
+        // Must not lose any tuples in the non relieving path
+        Set<String> uniqProcessed = new HashSet<>(tcProcessed.getResult());
+        assertEquals(tcProcessed.getResult().size(), uniqProcessed.size());
+        
+        assertEquals(uniq.size(), uniqProcessed.size());
+           
+        // Might lose tuples, but must not have send duplicates
+        uniq = new HashSet<>(tcSlow.getResult());
+        assertEquals(tcSlow.getResult().size(), uniq.size());
+    }
+    
+    @Test
+    public void testPressureRelieverWithInitialDelay() throws Exception {
+
+        Topology topology = newTopology();
+        
+        
+        TStream<String> raw = topology.strings("A", "B", "C", "D", "E", "F", "G", "H");
+        
+        TStream<String> pr = PlumbingStreams.pressureReliever(raw, v -> 0, 100);
+        
+        TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 5, TimeUnit.SECONDS);
+        
+        Condition<Long> tcCount = topology.getTester().tupleCount(pr2, 8);
+        Condition<List<String>> contents = topology.getTester().streamContents(pr2, "A", "B", "C", "D", "E", "F", "G", "H");
+        complete(topology, tcCount);
+        
+        assertTrue(tcCount.valid());
+        assertTrue(contents.valid());
+    }
+    
+    @Test
+    public void testValveState() throws Exception {
+        Valve<Integer> valve = new Valve<>();
+        assertTrue(valve.isOpen());
+        
+        valve = new Valve<>(true);
+        assertTrue(valve.isOpen());
+        
+        valve = new Valve<>(false);
+        assertFalse(valve.isOpen());
+        
+        valve.setOpen(true);
+        assertTrue(valve.isOpen());
+        
+        valve.setOpen(false);
+        assertFalse(valve.isOpen());
+    }
+    
+    @Test
+    public void testValveInitiallyOpen() throws Exception {
+        Topology top = newTopology("testValve");
+
+        TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+        
+        Valve<Integer> valve = new Valve<>();
+        AtomicInteger cnt = new AtomicInteger();
+        TStream<Integer> filtered = values
+                                    .peek(tuple -> {
+                                        // reject 4,5,6
+                                        int curCnt = cnt.incrementAndGet();
+                                        if (curCnt > 6)
+                                            valve.setOpen(true);
+                                        else if (curCnt > 3)
+                                            valve.setOpen(false);
+                                        })
+                                    .filter(valve);
+
+        Condition<Long> count = top.getTester().tupleCount(filtered, 7);
+        Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 1,2,3,7,8,9,10 );
+        complete(top, count);
+        assertTrue(contents.getResult().toString(), contents.valid());
+    }
+    
+    @Test
+    public void testValveInitiallyClosed() throws Exception {
+        Topology top = newTopology("testValve");
+        
+        TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+        
+        Valve<Integer> valve = new Valve<>(false);
+        
+        AtomicInteger cnt = new AtomicInteger();
+        TStream<Integer> filtered = values
+                                    .peek(tuple -> {
+                                        // reject all but 4,5,6
+                                        int curCnt = cnt.incrementAndGet();
+                                        if (curCnt > 6)
+                                            valve.setOpen(false);
+                                        else if (curCnt > 3)
+                                            valve.setOpen(true);
+                                        })
+                                    .filter(valve);
+
+        Condition<Long> count = top.getTester().tupleCount(filtered, 3);
+        Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 4,5,6 );
+        complete(top, count);
+        assertTrue(contents.getResult().toString(), contents.valid());
+    }
+    
+    private Function<Integer,JsonObject> fakeAnalytic(int channel, long period, TimeUnit unit) {
+      return value -> { 
+        try {
+          Thread.sleep(unit.toMillis(period));
+          JsonObject jo = new JsonObject();
+          jo.addProperty("channel", channel);
+          jo.addProperty("result", value);
+          return jo;
+        } catch (InterruptedException e) {
+          throw new RuntimeException("channel="+channel+" interrupted", e);
+        }
+      };
+    }
+
+    private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
+      return stream -> stream.map(fakeAnalytic(channel, period, unit)).filter(t->true).tag("pipeline-ch"+channel);
+    }
+    
+    @Test
+    public void testConcurrentMap() throws Exception {
+        Topology top = newTopology("testConcurrentMap");
+        
+        int ch = 0;
+        List<Function<Integer,JsonObject>> mappers = new ArrayList<>();
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        // a couple much faster just in case something's amiss with queues
+        mappers.add(fakeAnalytic(ch++, 3, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 13, TimeUnit.MILLISECONDS));
+        
+        Function<List<JsonObject>,Integer> combiner = list -> {
+            int sum = 0;
+            int cnt = 0;
+            System.out.println("combiner: "+list);
+            for(JsonObject jo : list) {
+              assertEquals(cnt++, jo.get("channel").getAsInt());
+              sum += jo.get("result").getAsInt();
+            }
+            return sum;
+        };
+
+        TStream<Integer> values = top.of(1, 2, 3);
+        Integer[] resultTuples = new Integer[]{
+            1*mappers.size(),
+            2*mappers.size(),
+            3*mappers.size(),
+        };
+        
+        TStream<Integer> result = PlumbingStreams.concurrentMap(values, mappers, combiner);
+        
+        Condition<Long> count = top.getTester().tupleCount(result, 3);
+        Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+
+        long begin = System.currentTimeMillis();
+        complete(top, count);
+        long end = System.currentTimeMillis();
+
+        assertTrue(contents.getResult().toString(), contents.valid());
+        
+        long actDuration = end - begin;
+        long expMinSerialDuration = resultTuples.length * mappers.size() * 100;
+        long expMinDuration = resultTuples.length * 100;
+        
+        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        
+        // a gross level performance check w/concurrent channels
+        if (Boolean.getBoolean("edgent.build.ci"))
+          System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
+        else
+          assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+              actDuration < 0.5 * expMinSerialDuration);
+    }
+    
+    @Test
+    public void testConcurrent() throws Exception {
+        Topology top = newTopology("testConcurrent");
+        
+        int ch = 0;
+        List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>();
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        
+        Function<List<JsonObject>,Integer> combiner = list -> {
+            int sum = 0;
+            int cnt = 0;
+            System.out.println("combiner: "+list);
+            for(JsonObject jo : list) {
+              assertEquals(cnt++, jo.get("channel").getAsInt());
+              sum += jo.get("result").getAsInt();
+            }
+            return sum;
+        };
+        
+        TStream<Integer> values = top.of(1, 2, 3);
+        Integer[] resultTuples = new Integer[]{
+            1*pipelines.size(),
+            2*pipelines.size(),
+            3*pipelines.size(),
+        };
+        
+        TStream<Integer> result = PlumbingStreams.concurrent(values, pipelines, combiner).tag("result");
+        
+        Condition<Long> count = top.getTester().tupleCount(result, 3);
+        Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+
+        long begin = System.currentTimeMillis();
+        complete(top, count);
+        long end = System.currentTimeMillis();
+        
+        assertTrue(contents.getResult().toString(), contents.valid());
+        
+        long actDuration = end - begin;
+        long expMinSerialDuration = resultTuples.length * pipelines.size() * 100;
+        long expMinDuration = resultTuples.length * 100;
+        
+        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        
+        // a gross level performance check w/concurrent channels
+        if (Boolean.getBoolean("edgent.build.ci"))
+          System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
+        else
+          assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+              actDuration < 0.5 * expMinSerialDuration);
+    }
+
+    private BiFunction<Integer,Integer,JsonObject> fakeParallelAnalytic(long period, TimeUnit unit) {
+      return (value,channel) -> { 
+        try {
+          Thread.sleep(unit.toMillis(period));  // simulate work for this period
+          JsonObject jo = new JsonObject();
+          jo.addProperty("channel", channel);
+          jo.addProperty("result", value);
+          return jo;
+        } catch (InterruptedException e) {
+          throw new RuntimeException("channel="+channel+" interrupted", e);
+        }
+      };
+    }
+    
+    private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
+      return (stream,channel) -> stream
+          .map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))
+          .filter(t->true)
+          .tag("pipeline-ch"+channel);
+    }
+    
+    private Function<JsonObject,JsonObject> fakeJsonAnalytic(int channel, long period, TimeUnit unit) {
+      return jo -> { 
+        try {
+          Thread.sleep(unit.toMillis(period));  // simulate work for this period
+          return jo;
+        } catch (InterruptedException e) {
+          throw new RuntimeException("channel="+channel+" interrupted", e);
+        }
+      };
+    }
+    
+    @SuppressWarnings("unused")
+    private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
+      return (stream,channel) -> stream
+          .map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
+                       return jo; })
+          .map(fakeJsonAnalytic(channel, period, unit))
+          .filter(t->true)
+          .map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
+                      return jo; })
+          .tag("pipeline-ch"+channel);
+    }
+    
+    @Test
+    public void testParallelMap() throws Exception {
+        Topology top = newTopology("testParallelMap");
+        
+        BiFunction<Integer,Integer,JsonObject> mapper = 
+            fakeParallelAnalytic(100, TimeUnit.MILLISECONDS);
+        
+        int width = 5;
+        ToIntFunction<Integer> splitter = tuple -> tuple % width;
+        
+        Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+        TStream<Integer> values = top.of(resultTuples);
+        
+        TStream<JsonObject> result = PlumbingStreams.parallelMap(values, width, splitter, mapper).tag("result");
+        TStream<Integer> result2 = result.map(jo -> {
+            int r = jo.get("result").getAsInt();
+            assertEquals(splitter.applyAsInt(r), jo.get("channel").getAsInt());
+            return r;
+          });
+        
+        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+    
+        long begin = System.currentTimeMillis();
+        complete(top, count);
+        long end = System.currentTimeMillis();
+        
+        assertTrue(contents.getResult().toString(), contents.valid());
+        
+        long actDuration = end - begin;
+        long expMinSerialDuration = resultTuples.length * 100;
+        long expMinDuration = (resultTuples.length / width) * 100;
+        
+        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        
+        // a gross level performance check w/parallel channels
+        if (Boolean.getBoolean("edgent.build.ci"))
+          System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
+        else
+          assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+              actDuration < 0.5 * expMinSerialDuration);
+    }
+    
+    @Test
+    public void testParallel() throws Exception {
+        Topology top = newTopology("testParallel");
+        
+        BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline = 
+            fakeParallelPipeline(100, TimeUnit.MILLISECONDS);
+        
+        int width = 5;
+        ToIntFunction<Integer> splitter = tuple -> tuple % width;
+        
+        Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+        TStream<Integer> values = top.of(resultTuples);
+        
+        TStream<JsonObject> result = PlumbingStreams.parallel(values, width, splitter, pipeline).tag("result");
+        TStream<Integer> result2 = result.map(jo -> {
+            int r = jo.get("result").getAsInt();
+            assertEquals(splitter.applyAsInt(r), jo.get("channel").getAsInt());
+            return r;
+          });
+        
+        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+        
+        long begin = System.currentTimeMillis();
+        complete(top, count);
+        long end = System.currentTimeMillis();
+        
+        assertTrue(contents.getResult().toString(), contents.valid());
+        
+        long actDuration = end - begin;
+        long expMinSerialDuration = resultTuples.length * 100;
+        long expMinDuration = (resultTuples.length / width) * 100;
+        
+        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        
+        // a gross level performance check w/parallel channels
+        if (Boolean.getBoolean("edgent.build.ci"))
+          System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
+        else
+          assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+              actDuration < 0.5 * expMinSerialDuration);
+    }
+    
+    @Test
+    public void testParallelBalanced() throws Exception {
+        // May need tweak validation sensitivity or add this:
+        // Timing variances on shared machines can cause this test to fail
+        // assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+        Topology top = newTopology("testParallelBalanced");
+        
+        // arrange for even channels to process ~2x as many as odd channels.
+        BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline =
+            (stream,ch) -> {
+              long delay = (ch % 2 == 0) ? 10 : 20;
+              return stream.map(fakeAnalytic(ch, delay, TimeUnit.MILLISECONDS));
+            };
+        
+        int width = 4;
+        int tupCnt = 60;
+        Integer[] resultTuples = new Integer[tupCnt];
+        for (int i = 0; i < tupCnt; i++)
+          resultTuples[i] = i;
+        AtomicInteger[] chCounts = new AtomicInteger[width];
+        for (int ch = 0; ch < width; ch++)
+          chCounts[ch] = new AtomicInteger();
+        
+        TStream<Integer> values = top.of(resultTuples);
+        
+        TStream<JsonObject> result = PlumbingStreams.parallelBalanced(values, width, pipeline).tag("result");
+        TStream<Integer> result2 = result.map(jo -> {
+            int r = jo.get("result").getAsInt();
+            int ch = jo.get("channel").getAsInt();
+            chCounts[ch].incrementAndGet();
+            return r;
+          });
+        
+        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+        
+        long begin = System.currentTimeMillis();
+        complete(top, count);
+        long end = System.currentTimeMillis();
+        
+        assertTrue(contents.getResult().toString(), contents.valid());
+        
+        long actDuration = end - begin;
+        long expMinSerialDuration = resultTuples.length * 20;
+        long expMinDuration = (resultTuples.length / width) * 20;
+        
+        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        System.out.println(top.getName()+" chCounts="+Arrays.asList(chCounts));
+        
+        // a gross level performance check w/parallel channels
+        if (Boolean.getBoolean("edgent.build.ci"))
+          System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
+        else
+          assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+              actDuration < 0.5 * expMinSerialDuration);
+        
+        int evenChCounts = 0;
+        int oddChCounts = 0;
+        for (int ch = 0; ch < width; ch++) {
+          assertTrue(chCounts[ch].get() != 0);
+          if (ch % 2 == 0)
+            evenChCounts += chCounts[ch].get();
+          else
+            oddChCounts += chCounts[ch].get();
+        }
+        assertTrue(oddChCounts > 0.4 * evenChCounts
+            && oddChCounts < 0.6 * evenChCounts);
+    }
+    
+//    @Test
+//    public void testParallelTiming() throws Exception {
+//        Topology top = newTopology("testParallelTiming");
+//        
+//        BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> pipeline = 
+//            fakeParallelPipelineTiming(100, TimeUnit.MILLISECONDS);
+//        
+//        int width = 5;
+//        // ToIntFunction<Integer> splitter = tuple -> tuple % width;
+//        ToIntFunction<JsonObject> splitter = jo -> jo.get("result").getAsInt() % width;
+//        
+//        Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+//        TStream<Integer> values = top.of(resultTuples);
+//        
+//        TStream<JsonObject> inStream = values.map(value -> {
+//            JsonObject jo = new JsonObject();
+//            jo.addProperty("result", value);
+//            jo.addProperty("channel", splitter.applyAsInt(jo));
+//            jo.addProperty("enterParallelMsec", System.currentTimeMillis());
+//            return jo;
+//          });
+//        TStream<JsonObject> result = PlumbingStreams.parallel(inStream, width, splitter, pipeline).tag("result");
+//        TStream<Integer> result2 = result.map(jo -> {
+//            jo.addProperty("exitParallelMsec", System.currentTimeMillis());
+//            System.out.println("ch="+jo.get("channel").getAsInt()
+//                +" endPipeline-startPipeline="
+//                  +(jo.get("endPipelineMsec").getAsLong()
+//                    - jo.get("startPipelineMsec").getAsLong())
+//                +" exitParallel-startPipeine="
+//                  +(jo.get("exitParallelMsec").getAsLong()
+//                      - jo.get("startPipelineMsec").getAsLong()));
+//            int r = jo.get("result").getAsInt();
+//            assertEquals(splitter.applyAsInt(jo), jo.get("channel").getAsInt());
+//            return r;
+//          });
+//        
+//        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+//        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+//        long begin = System.currentTimeMillis();
+//        complete(top, count);
+//        long end = System.currentTimeMillis();
+//        assertTrue(contents.getResult().toString(), contents.valid());
+//        
+//        long actDuration = end - begin;
+//        
+//        long expMinSerialDuration = resultTuples.length * 100;
+//        long expMinDuration = (resultTuples.length / width) * 100;
+//        
+//        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+//        
+//        // a gross level performance check w/parallel channels
+//        assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+//            actDuration < 0.5 * expMinSerialDuration);
+//    }
+
+    @Test
+    public void testGate() throws Exception {
+        Topology topology = newTopology("testGate");
+
+        TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+        Semaphore semaphore = new Semaphore(1);
+        raw = PlumbingStreams.gate(raw, semaphore);
+
+        ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+        ArrayList<Integer> arrayResult = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            arrayResult.add(0);
+            arrayResult.add(1);
+        }
+
+        raw.sink(t -> {
+            //Add 0 to list because semaphore.acquire() in sync has occurred
+            resultAvailablePermits.add(semaphore.availablePermits());
+            semaphore.release();
+            //Add 1 to list because semaphore.release() has executed
+            resultAvailablePermits.add(semaphore.availablePermits());
+        });
+
+        Condition<List<String>> contents = topology.getTester()
+            .streamContents(raw, "a", "b", "c", "d", "e");
+        complete(topology, contents);
+
+        assertTrue("valid:" + contents.getResult(), contents.valid());
+        assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+    }
+
+    @Test
+    public void testGateWithLocking() throws Exception {
+        Topology topology = newTopology("testGateWithLocking");
+
+        TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+        Semaphore semaphore = new Semaphore(3);
+        raw = PlumbingStreams.gate(raw, semaphore);
+
+        ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+        ArrayList<Integer> arrayResult = new ArrayList<>();
+        arrayResult.add(2);
+        arrayResult.add(1);
+        arrayResult.add(0);
+
+        raw.sink(t -> {
+            //Add number of availablePermits
+            resultAvailablePermits.add(semaphore.availablePermits());
+        });
+
+        Condition<List<String>> contents = topology.getTester().streamContents(raw, "a", "b", "c");
+        complete(topology, contents, 1000, TimeUnit.MILLISECONDS);
+
+        assertTrue("valid:" + contents.getResult(), contents.valid());
+        assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java
new file mode 100644
index 0000000..7625667
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java
@@ -0,0 +1,875 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TWindow;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public abstract class TStreamTest extends TopologyAbstractTest {
+
+    @Test
+    public void testAlias() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("a", "b");
+        assertEquals(null, s.getAlias());
+        
+        TStream<String> s2 = s.alias("sAlias");
+        assertSame(s, s2);
+        assertEquals("sAlias", s.getAlias());
+        
+        try {
+            s.alias("another");  // expect ISE - alias already set
+            assertTrue(false);
+        } catch (IllegalStateException e) {
+            ; // expected
+        }
+        
+        // test access at runtime
+        s2 = s.peek(tuple -> {
+            assertEquals("sAlias", s.getAlias());
+        }).filter(tuple -> true);
+
+        // just verify that alias presence doesn't otherwise muck up things
+        Condition<Long> tc = t.getTester().tupleCount(s2, 2);
+        Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
+        complete(t, tc);
+
+        assertTrue("contents "+contents.getResult(), contents.valid());
+    }
+
+    @Test
+    public void testTag() throws Exception {
+
+        Topology t = newTopology();
+
+        List<String> tags = new ArrayList<>(Arrays.asList("tag1", "tag2"));
+        
+        TStream<String> s = t.strings("a", "b");
+        assertEquals(0, s.getTags().size());
+        
+        TStream<String> s2 = s.tag("tag1", "tag2");
+        assertSame(s, s2);
+        assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
+        
+        tags.add("tag3");
+        s.tag("tag3");
+        assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
+        
+        s.tag("tag3", "tag2", "tag1");  // ok to redundantly add
+        assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
+
+        // test access at runtime
+        s2 = s.peek(tuple -> {
+            assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
+        }).filter(tuple -> true);
+
+        // just verify that tag presence doesn't otherwise muck up things
+        Condition<Long> tc = t.getTester().tupleCount(s2, 2);
+        Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
+        complete(t, tc);
+
+        assertTrue("contents "+contents.getResult(), contents.valid());
+    }
+
+    @Test
+    public void testFilter() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("a", "b", "c");
+        s = s.filter(tuple -> "b".equals(tuple));
+        assertStream(t, s);
+
+        Condition<Long> tc = t.getTester().tupleCount(s, 1);
+        Condition<List<String>> contents = t.getTester().streamContents(s, "b");
+        complete(t, tc);
+
+        assertTrue(contents.valid());
+    }
+
+    /**
+     * Test Peek. This will only work with an embedded setup.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testPeek() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("a", "b", "c");
+        List<String> peekedValues = new ArrayList<>();
+		TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple));
+		assertSame(s, speek);
+
+		Condition<Long> tc = t.getTester().tupleCount(s, 3);
+		Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
+        complete(t, tc);
+
+        assertTrue(contents.valid());
+        assertEquals(contents.getResult(), peekedValues);
+    }
+
+	@Test
+	public void testMultiplePeek() throws Exception {
+
+		Topology t = newTopology();
+
+		TStream<String> s = t.strings("a", "b", "c");
+		List<String> peekedValues = new ArrayList<>();
+		TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st"));
+		assertSame(s, speek);
+
+		TStream<String> speek2 = s.peek(tuple -> peekedValues.add(tuple + "2nd"));
+		assertSame(s, speek2);
+		TStream<String> speek3 = s.peek(tuple -> peekedValues.add(tuple + "3rd"));
+		assertSame(s, speek3);
+
+		Condition<Long> tc = t.getTester().tupleCount(s, 3);
+		Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
+		complete(t, tc);
+
+		assertTrue(contents.valid());
+        List<String> expected = Arrays.asList("a1st", "a2nd", "a3rd", "b1st", "b2nd", "b3rd", "c1st", "c2nd", "c3rd");
+		assertEquals(expected, peekedValues);
+	}
+
+    @Test
+    public void testMap() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("32", "423", "-746");
+        TStream<Integer> i = s.map(Integer::valueOf);
+        assertStream(t, i);
+
+        Condition<Long> tc = t.getTester().tupleCount(i, 3);
+        Condition<List<Integer>> contents = t.getTester().streamContents(i, 32, 423, -746);
+        complete(t, tc);
+
+        assertTrue(contents.getResult().toString(), contents.valid());
+    }
+    
+    @Test
+    public void testModifyWithDrops() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("32", "423", "-746");
+        TStream<Integer> i = s.map(Integer::valueOf);
+        i = i.modify(tuple -> tuple < 0 ? null : tuple + 27);
+        assertStream(t, i);
+
+        Condition<Long> tc = t.getTester().tupleCount(i, 2);
+        Condition<List<Integer>> contents = t.getTester().streamContents(i, 59, 450);
+        complete(t, tc);
+
+        assertTrue(contents.getResult().toString(), contents.valid());
+    }
+
+    @Test
+    public void testModify() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("a", "b", "c");
+        TStream<String> i = s.modify(tuple -> tuple.concat("M"));
+        assertStream(t, i);
+
+        Condition<Long> tc = t.getTester().tupleCount(i, 3);
+        Condition<List<String>> contents = t.getTester().streamContents(i, "aM", "bM", "cM");
+        complete(t, tc);
+
+        assertTrue(contents.valid());
+    }
+    
+    @Test
+    public void tesFlattMap() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("mary had a little lamb",
+                "its fleece was white as snow");
+        TStream<String> w = s.flatMap(tuple->Arrays.asList(tuple.split(" ")));
+        assertStream(t, w);
+
+        Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
+                "a", "little", "lamb", "its", "fleece", "was", "white", "as",
+                "snow");
+        complete(t, contents);
+
+        assertTrue(contents.getResult().toString(), contents.valid());
+    }
+    
+    @Test
+    public void tesFlattMapWithNullIterator() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("mary had a little lamb", "NOTUPLES",
+                "its fleece was white as snow");
+        TStream<String> w = s.flatMap(tuple->tuple.equals("NOTUPLES") ? null : Arrays.asList(tuple.split(" ")));
+        assertStream(t, w);
+
+        Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
+                "a", "little", "lamb", "its", "fleece", "was", "white", "as",
+                "snow");
+        complete(t, contents);
+
+        assertTrue(contents.getResult().toString(), contents.valid());
+    }
+    
+    @Test
+    public void tesFlattMapWithNullValues() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("mary had a little lamb",
+                "its fleece was white as snow");
+        TStream<String> w = s.flatMap(tuple-> {List<String> values = Arrays.asList(tuple.split(" "));
+          values.set(2, null); values.set(4, null); return values;});
+        assertStream(t, w);
+
+        Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
+                "little", "its", "fleece",  "white",
+                "snow");
+        complete(t, contents);
+
+        assertTrue(contents.getResult().toString(), contents.valid());
+    }
+
+    /**
+     * Test split() with no drops.
+     * @throws Exception on failure
+     */
+    @Test
+    public void testSplit() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("a1", "b1", "a2", "c1", "e1", "c2", "c3", "b2", "a3", "b3", "d1", "e2");
+        List<TStream<String>> splits = s.split(3, tuple -> tuple.charAt(0) - 'a');
+
+        Condition<Long> tc0 = t.getTester().tupleCount(splits.get(0), 4);
+        Condition<Long> tc1 = t.getTester().tupleCount(splits.get(1), 5);
+        Condition<Long> tc2 = t.getTester().tupleCount(splits.get(2), 3);
+        Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(0), "a1", "a2", "a3", "d1");
+        Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(1), "b1", "e1", "b2", "b3", "e2");
+        Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(2), "c1", "c2", "c3");
+
+        complete(t, t.getTester().and(tc0, tc1, tc2));
+
+        assertTrue(contents0.toString(), contents0.valid());
+        assertTrue(contents1.toString(), contents1.valid());
+        assertTrue(contents2.toString(), contents2.valid());
+    }
+
+    /**
+     * Test split() with drops.
+     * @throws Exception on failure
+     */
+    @Test
+    public void testSplitWithDrops() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("a1", "b1", "a2", "c1", "e1", "c2", "c3", "b2", "a3", "b3", "d1", "e2");
+        List<TStream<String>> splits = s.split(3, tuple -> {
+            switch (tuple.charAt(0)) {
+            case 'a':
+                return 1;
+            case 'b':
+                return 4;
+            case 'c':
+                return 8;
+            case 'd':
+                return -34;
+            case 'e':
+                return -1;
+            default:
+                return -1;
+            }
+        });
+
+        Condition<Long> tc0 = t.getTester().tupleCount(splits.get(0), 0);
+        Condition<Long> tc1 = t.getTester().tupleCount(splits.get(1), 6);
+        Condition<Long> tc2 = t.getTester().tupleCount(splits.get(2), 3);
+        Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(0));
+        Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(1), "a1", "b1", "a2", "b2", "a3",
+                "b3");
+        Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(2), "c1", "c2", "c3");
+
+        complete(t, t.getTester().and(tc0, tc1, tc2));
+
+        assertTrue(contents0.toString(), contents0.valid());
+        assertTrue(contents1.toString(), contents1.valid());
+        assertTrue(contents2.toString(), contents2.valid());
+    }
+
+    /**
+     * Test split() zero outputs
+     * @throws Exception on failure
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void testSplitWithZeroOutputs() throws Exception {
+        newTopology().strings("a1").split(0, tuple -> 0);
+    }
+
+    /**
+     * Test split() negative outputs
+     * @throws Exception on failure
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void testSplitWithNegativeOutputs() throws Exception {
+        newTopology().strings("a1").split(-28, tuple -> 0);
+    }
+
+    /**
+     * Test enum data structure
+     */
+    private enum LogSeverityEnum {
+        ALERT(1), CRITICAL(2), ERROR(3), WARNING(4), NOTICE(5), INFO(6), DEBUG(7);
+
+        @SuppressWarnings("unused")
+        private final int code;
+
+        LogSeverityEnum(final int code) {
+            this.code = code;
+        }
+    }
+
+    /**
+     * Test split(enum) with integer type enum.
+     * @throws Exception on failure
+     */
+    @Test
+    public void testSplitWithEnum() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("Log1_ALERT", "Log2_INFO", "Log3_INFO", "Log4_INFO", "Log5_ERROR", "Log6_ERROR", "Log7_CRITICAL");
+        TStream<String> i = s.map(String::toString);
+        EnumMap<LogSeverityEnum,TStream<String>> splits = i.split(LogSeverityEnum.class, e -> LogSeverityEnum.valueOf(e.split("_")[1]));
+
+        assertStream(t, i);
+
+        Condition<Long> tc0 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ALERT), 1);
+        Condition<Long> tc1 = t.getTester().tupleCount(splits.get(LogSeverityEnum.INFO), 3);
+        Condition<Long> tc2 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ERROR), 2);
+        Condition<Long> tc3 = t.getTester().tupleCount(splits.get(LogSeverityEnum.CRITICAL), 1);
+        Condition<Long> tc4 = t.getTester().tupleCount(splits.get(LogSeverityEnum.WARNING), 0);
+
+        Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(LogSeverityEnum.ALERT), "Log1_ALERT");
+        Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(LogSeverityEnum.INFO), "Log2_INFO",
+            "Log3_INFO", "Log4_INFO");
+        Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(LogSeverityEnum.ERROR), "Log5_ERROR",
+            "Log6_ERROR");
+        Condition<List<String>> contents3 = t.getTester().streamContents(splits.get(LogSeverityEnum.CRITICAL), "Log7_CRITICAL");
+        Condition<List<String>> contents4 = t.getTester().streamContents(splits.get(LogSeverityEnum.WARNING));
+
+        complete(t, t.getTester().and(tc0, tc1, tc2, tc3, tc4));
+
+
+        assertTrue(contents0.toString(), contents0.valid());
+        assertTrue(contents1.toString(), contents1.valid());
+        assertTrue(contents2.toString(), contents2.valid());
+        assertTrue(contents3.toString(), contents3.valid());
+        assertTrue(contents4.toString(), contents4.valid());
+    }
+
+    private enum EnumClassWithZerosize {
+        ;
+    }
+
+    /**
+     * Test split(enum) with integer type enum.
+     * @throws Exception on failure
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void testSplitWithEnumForZeroSizeClass() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("Test");
+        s.split(EnumClassWithZerosize.class, e -> EnumClassWithZerosize.valueOf("Test"));
+    }
+
+    @Test
+    public void testFanout2() throws Exception {
+
+        Topology t = newTopology();
+        
+        TStream<String> s = t.strings("a", "b", "c");
+        TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
+        TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
+
+        Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
+        Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
+        Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cfo2");
+
+        complete(t, t.getTester().and(tsm, tsmc));
+
+        assertTrue(tsf.getResult().toString(), tsf.valid());
+        assertTrue(tsm.getResult().toString(), tsm.valid());
+    }
+    @Test
+    public void testFanout3() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("a", "b", "cde");
+        TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
+        TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
+        TStream<byte[]> st = s.map(tuple -> tuple.getBytes());
+
+        Condition<Long> tsfc = t.getTester().tupleCount(sf, 1);
+        Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
+        Condition<Long> tstc = t.getTester().tupleCount(st, 3);
+        Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
+        Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cdefo2");
+        Condition<List<byte[]>> tst = t.getTester().streamContents(st, "a".getBytes(), "b".getBytes(), "cde".getBytes());
+
+        complete(t, t.getTester().and(tsfc, tsmc, tstc));
+
+        assertTrue(tsf.valid());
+        assertTrue(tsm.valid());
+
+        // Can't use equals on byte[]
+        assertEquals(3, tst.getResult().size());
+        assertEquals("a", new String(tst.getResult().get(0)));
+        assertEquals("b", new String(tst.getResult().get(1)));
+        assertEquals("cde", new String(tst.getResult().get(2)));
+    }
+
+    @Test
+    public void testPeekThenFanout() throws Exception {
+        _testFanoutWithPeek(1, 0, 0);
+    }
+
+    @Test
+    public void testFanoutThenPeek() throws Exception {
+        _testFanoutWithPeek(0, 0, 1);
+    }
+
+    @Test
+    public void testPeekMiddleFanout() throws Exception {
+        _testFanoutWithPeek(0, 1, 0);
+    }
+
+    @Test
+    public void testMultiPeekFanout() throws Exception {
+        _testFanoutWithPeek(3, 3, 3);
+    }
+
+    void _testFanoutWithPeek(int numBefore, int numMiddle, int numAfter) throws Exception {
+
+        Topology t = newTopology();
+
+        List<Peeked> values = new ArrayList<>();
+        values.add(new Peeked(33));
+        values.add(new Peeked(-214));
+        values.add(new Peeked(9234));
+        for (Peeked p : values)
+            assertEquals(0, p.peekedCnt);
+
+        TStream<Peeked> s = t.collection(values);
+        if (numBefore > 0) {
+          for (int i = 0; i < numBefore; i++)
+            s.peek(tuple -> tuple.peekedCnt++);
+        }
+
+        TStream<Peeked> sf = s.filter(tuple -> tuple.value > 0);
+        if (numMiddle > 0) {
+          for (int i = 0; i < numMiddle; i++)
+            s.peek(tuple -> tuple.peekedCnt++);
+        }
+        TStream<Peeked> sm = s.modify(tuple -> new Peeked(tuple.value + 37, tuple.peekedCnt));
+
+        if (numAfter > 0) {
+          for (int i = 0; i < numAfter; i++)
+            s.peek(tuple -> tuple.peekedCnt++);
+        }
+
+        int totPeeks = numBefore + numMiddle + numAfter;
+        Condition<Long> tsfc = t.getTester().tupleCount(sf, 2);
+        Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
+        Condition<List<Peeked>> tsf = t.getTester().streamContents(sf, new Peeked(33, totPeeks), new Peeked(9234, totPeeks));
+        Condition<List<Peeked>> tsm = t.getTester().streamContents(sm, new Peeked(70, totPeeks), new Peeked(-177, totPeeks),
+                new Peeked(9271, totPeeks));
+
+        complete(t, t.getTester().and(tsfc, tsmc));
+
+        assertTrue(tsf.getResult().toString(), tsf.valid());
+        assertTrue(tsm.getResult().toString(), tsm.valid());
+    }
+
+    public static class Peeked implements Serializable {
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + peekedCnt;
+            result = prime * result + value;
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            Peeked other = (Peeked) obj;
+            if (peekedCnt != other.peekedCnt)
+                return false;
+            if (value != other.value)
+                return false;
+            return true;
+        }
+
+        private static final long serialVersionUID = 1L;
+        final int value;
+        int peekedCnt;
+
+        Peeked(int value) {
+            this.value = value;
+        }
+
+        Peeked(int value, boolean peeked) {
+          this(value, 1);
+        }
+
+        Peeked(int value, int peekedCnt) {
+          this.value = value;
+          // this.peeked = true;
+          this.peekedCnt = peekedCnt;
+        }
+        
+        public String toString() {
+          return "{" + "value=" + value + " peekedCnt=" + peekedCnt + "}";
+        }
+    }
+    
+    /**
+     * Test Union with itself.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testUnionWithSelf() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("a", "b", "c");
+
+        assertSame(s, s.union(s));
+        assertSame(s, s.union(Collections.emptySet()));
+        assertSame(s, s.union(Collections.singleton(s)));
+    }
+    
+    @Test
+    public void testUnion2() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s1 = t.strings("a", "b", "c");
+        TStream<String> s2 = t.strings("d", "e");
+        TStream<String> su = s1.union(s2);
+        assertNotSame(s1, su);
+        assertNotSame(s2, su);
+        TStream<String> r = su.modify(v -> v.concat("X"));
+
+        Condition<Long> tc = t.getTester().tupleCount(r, 5);
+        Condition<List<String>> contents = t.getTester().contentsUnordered(r,
+                "aX", "bX", "cX", "dX", "eX");
+        complete(t, tc);
+
+        assertTrue(tc.getResult().toString(), tc.valid());
+        assertTrue(contents.getResult().toString(), contents.valid());
+    }
+    
+    @Test
+    public void testUnion4() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s1 = t.strings("a", "b", "c");
+        TStream<String> s2 = t.strings("d", "e");
+        TStream<String> s3 = t.strings("f", "g", "h", "i");
+        TStream<String> s4 = t.strings("j");
+        TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s2, s3, s4)));
+        assertNotSame(s1, su);
+        assertNotSame(s2, su);
+        assertNotSame(s3, su);
+        assertNotSame(s4, su);
+        TStream<String> r = su.modify(v -> v.concat("Y"));
+
+        Condition<Long> tc = t.getTester().tupleCount(r, 10);
+        Condition<List<String>> contents = t.getTester().contentsUnordered(r,
+                "aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
+        complete(t, tc);
+
+        assertTrue(tc.getResult().toString(), tc.valid());
+        assertTrue(contents.getResult().toString(), contents.valid());
+    }
+    
+    @Test
+    public void testUnion4WithSelf() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s1 = t.strings("a", "b", "c");
+        TStream<String> s2 = t.strings("d", "e");
+        TStream<String> s3 = t.strings("f", "g", "h", "i");
+        TStream<String> s4 = t.strings("j");
+        TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s1, s2, s3, s4)));
+        assertNotSame(s1, su);
+        assertNotSame(s2, su);
+        assertNotSame(s3, su);
+        assertNotSame(s4, su);
+        TStream<String> r = su.modify(v -> v.concat("Y"));
+
+        Condition<Long> tc = t.getTester().tupleCount(r, 10);
+        Condition<List<String>> contents = t.getTester().contentsUnordered(r,
+                "aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
+        complete(t, tc);
+
+        assertTrue(tc.getResult().toString(), tc.valid());
+        assertTrue(contents.getResult().toString(), contents.valid());
+    }
+    
+    @Test
+    public void testSink() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("a", "b", "c");
+        
+        List<String> sinked = new ArrayList<>();
+        TSink<String> terminal = s.sink(tuple -> sinked.add(tuple));
+        assertNotNull(terminal);
+        assertSame(t, terminal.topology());
+        assertSame(s, terminal.getFeed());
+        TStream<String> s1 = s.filter(tuple -> true);
+
+        Condition<Long> tc = t.getTester().tupleCount(s1, 3);
+        complete(t, tc);
+        
+        assertEquals("a", sinked.get(0));
+        assertEquals("b", sinked.get(1));
+        assertEquals("c", sinked.get(2));
+    }
+    
+    /**
+     * Submit multiple jobs concurrently using ProcessSource.
+     * @throws Exception on failure
+     */
+    @Test
+    public void testMultiTopology() throws Exception {
+
+        int executions = 4;
+        ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
+                Executors.newFixedThreadPool(executions));
+        for (int i = 0; i < executions; i++) {
+            completer.submit(() -> {
+                Topology t = newTopology();
+                TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h");
+                s.sink((tuple) -> { if ("h".equals(tuple)) System.out.println(tuple);});
+                Condition<Long> tc = t.getTester().tupleCount(s, 8);
+                complete(t, tc);
+                return true;
+            });
+        }
+        waitForCompletion(completer, executions);
+    }
+
+    /**
+     * Submit multiple jobs concurrently using ProcessSource.
+     * @throws Exception on failure
+     */
+    @Test
+    public void testMultiTopologyWithError() throws Exception {
+
+        int executions = 4;
+        ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
+                Executors.newFixedThreadPool(executions));
+        for (int i = 0; i < executions; i++) {
+            completer.submit(() -> {
+                Topology t = newTopology();
+                TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h");
+                // Throw on the 8th tuple
+                s.sink((tuple) -> { if ("h".equals(tuple)) throw new RuntimeException("Expected Test Exception");});
+                // Expect 7 tuples out of 8
+                Condition<Long> tc = t.getTester().tupleCount(s, 7);
+                complete(t, tc);
+                return true;
+            });
+        }
+        waitForCompletion(completer, executions);
+    }
+    
+    /**
+     * Submit multiple jobs concurrently using PeriodicSource.
+     * @throws Exception on failure
+     */
+    @Test
+    public void testMultiTopologyPollWithError() throws Exception {
+
+        int executions = 4;
+        ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
+                Executors.newFixedThreadPool(executions));
+        for (int i = 0; i < executions; i++) {
+            completer.submit(() -> {
+                Topology t = newTopology();
+                AtomicLong n = new AtomicLong(0);
+                TStream<Long> s = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
+                // Throw on the 8th tuple
+                s.sink((tuple) -> { if (8 == n.get()) throw new RuntimeException("Expected Test Exception");});
+                // Expect 7 tuples out of 8
+                Condition<Long> tc = t.getTester().tupleCount(s, 7);
+                complete(t, tc);
+                return true;
+            });
+        }
+        waitForCompletion(completer, executions);
+    }
+    
+    @Test
+    public void testJoinWithWindow() throws Exception{
+        Topology t = newTopology();
+        
+        List<Integer> ints = new ArrayList<>();
+        List<Integer> lookupInts = new ArrayList<>();
+        
+        // Ints to populate the window
+        for(int i = 0; i < 100; i++){
+            ints.add(i);
+        }
+        
+        // Ints to lookup partitions in window
+        for(int i = 0; i < 10; i++){
+            lookupInts.add(i);
+        }
+        TStream<Integer> intStream = t.collection(ints);
+        
+        // Wait until the window is populated, and then submit tuples
+        TStream<Integer> lookupIntStream = t.source(() -> {
+            try {
+                Thread.sleep(500);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            return lookupInts;
+        });
+        
+        TWindow<Integer, Integer> window = intStream.last(10, tuple -> tuple % 10);
+        TStream<Integer> joinsHappened = lookupIntStream.join(tuple -> tuple % 10, window, (number, partitionContents) -> {
+            assertTrue(partitionContents.size() == 10);
+            for(Integer element : partitionContents)
+                assertTrue(number % 10 == element % 10);
+            
+            // Causes an error if two numbers map to the same partition, which shouldn't happen
+            partitionContents.clear();
+            return 0;
+        });
+    
+        Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 10);
+        complete(t, tc);      
+    }
+    
+    @Test
+    public void testJoinLastWithKeyer() throws Exception{
+        Topology t = newTopology();
+        
+        List<Integer> ints = new ArrayList<>();
+        for(int i = 0; i < 100; i++){
+            ints.add(i);
+        }
+        
+        TStream<Integer> intStream = t.collection(ints);
+        
+        // Wait until the window is populated, and then submit tuples
+        TStream<Integer> lookupIntStream = t.source(() -> {
+            try {
+                Thread.sleep(500);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            return ints;
+        });
+        
+        TStream<String> joinsHappened = lookupIntStream.joinLast(tuple -> tuple, intStream, tuple -> tuple, (a, b) -> {
+            assertTrue(a.equals(b));
+            return "0";
+        });
+
+        Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 100);
+        complete(t, tc);      
+    }
+
+    private void waitForCompletion(ExecutorCompletionService<Boolean> completer, int numtasks) throws ExecutionException {
+        int remainingTasks = numtasks;
+        while (remainingTasks > 0) {
+            try {
+                Future<Boolean> completed = completer.poll(4, TimeUnit.SECONDS);
+                if (completed == null) {
+                    System.err.println("Completer timed out");
+                    throw new RuntimeException(new TimeoutException());
+                }
+                else {
+                    completed.get();
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            remainingTasks--;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TWindowTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TWindowTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TWindowTest.java
new file mode 100644
index 0000000..ce66653
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TWindowTest.java
@@ -0,0 +1,174 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.apache.edgent.function.Functions.identity;
+import static org.apache.edgent.function.Functions.unpartitioned;
+import static org.apache.edgent.function.Functions.zero;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TWindow;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public abstract class TWindowTest extends TopologyAbstractTest{
+    @Test
+    public void testCountBasedBatch() throws Exception{
+        Topology top = newTopology();
+        List<Integer> intList = new ArrayList<>();
+        for(int i = 0; i < 1000;i++)
+            intList.add(i);
+        TStream<Integer> ints = top.source(() -> intList);
+        
+        TWindow<Integer, Integer> window = ints.last(100, tuple -> 0);
+        TStream<Integer> sizes = window.batch((tuples, key) -> {
+            return tuples.size();
+        });
+        Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
+                100,100,100,100,100,100,100,100,100,100);
+        complete(top, contents);
+        assertTrue(contents.valid());
+    }
+    
+    @Test
+    public void testTimeBasedBatch() throws Exception{
+        Topology top = newTopology();
+        TStream<Integer> ints = top.poll(() -> {
+            return 1;
+        }, 10, TimeUnit.MILLISECONDS);
+        
+        TWindow<Integer, Integer> window = ints.last(1000, TimeUnit.MILLISECONDS, tuple -> 0);
+        TStream<Integer> sizes = window.batch((tuples, key) -> {
+            return tuples.size();
+        });
+
+        Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
+           100, 100, 100, 100, 100, 100, 100, 100, 100, 100);
+        complete(top, contents);
+        System.out.println(contents.getResult());
+        for(Integer size : contents.getResult()){
+            assertTrue(size >= 90 && size <= 110);
+        }
+    }
+    
+    @Test
+    public void testKeyedWindowSum() throws Exception {
+        Topology t = newTopology();
+        
+        TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4,4,3,4,4,3));
+        TWindow<Integer, Integer> window = integers.last(9, identity());
+        assertSame(identity(), window.getKeyFunction());
+        assertSame(t, window.topology());
+        assertSame(integers, window.feeder());
+
+        TStream<Integer> sums = window.aggregate((tuples, key) -> {
+            // All tuples in a partition are equal due to identity
+            assertEquals(1, new HashSet<>(tuples).size());
+            int sum = 0;
+            for(Integer tuple : tuples)
+                sum+=tuple;
+            return sum;
+        });
+        
+        Condition<Long> tc = t.getTester().tupleCount(sums, 9);
+        Condition<List<Integer>> contents = t.getTester().streamContents(sums, 
+                1, 2, 3, 4, 8, 6, 12, 16, 9);
+        complete(t, tc);
+
+        assertTrue(contents.valid());
+    }
+    
+    @Test
+    public void testWindowSum() throws Exception {
+        Topology t = newTopology();
+        
+        TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4));
+        TWindow<Integer, Integer> window = integers.last(4, unpartitioned());
+        assertSame(unpartitioned(), window.getKeyFunction());
+        TStream<Integer> sums = window.aggregate((tuples, key) -> {
+            assertEquals(Integer.valueOf(0), key);
+            int sum = 0;
+            for(Integer tuple : tuples)
+                sum+=tuple;
+            return sum;
+        });
+
+        Condition<Long> tc = t.getTester().tupleCount(sums, 4);
+        Condition<List<Integer>> contents = t.getTester().streamContents(sums, 1, 3, 6, 10);
+        complete(t, tc);
+
+        assertTrue(contents.valid());
+    }
+    
+    @Test
+    public void testTimeWindowTimeDiff() throws Exception {
+		// Timing variances on shared machines can cause this test to fail
+    	assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+    	
+        Topology t = newTopology();
+        
+        // Define data
+        ConcurrentLinkedQueue<Long> diffs = new ConcurrentLinkedQueue<>();
+        
+        // Poll data
+        TStream<Long> times = t.poll(() -> {   
+            return System.currentTimeMillis();
+        }, 1, TimeUnit.MILLISECONDS);
+
+        TWindow<Long, Integer> window = times.last(1, TimeUnit.SECONDS, unpartitioned());
+        assertSame(zero(), window.getKeyFunction());
+        TStream<Long> diffStream = window.aggregate((tuples, key) -> {
+            assertEquals(Integer.valueOf(0), key);
+            if(tuples.size() < 2){
+                return null;
+            }
+            return tuples.get(tuples.size() -1) - tuples.get(0);
+        });
+        
+        diffStream.sink((tuple) -> diffs.add(tuple));
+        
+        Condition<Long> tc = t.getTester().tupleCount(diffStream, 5000);
+        complete(t, tc);
+        
+        for(Long diff : diffs){
+            assertTrue("Diff is: " + diff, diff >=0 && diff < 1060);
+        }
+        
+    }
+    
+    public static boolean withinTolerance(double expected, Double actual, double tolerance) {
+        double lowBound = (1.0 - tolerance) * expected;
+        double highBound = (1.0 + tolerance) * expected;
+        return (actual < highBound && actual > lowBound);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyAbstractTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyAbstractTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyAbstractTest.java
new file mode 100644
index 0000000..fd413a9
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyAbstractTest.java
@@ -0,0 +1,107 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Submitter;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyProvider;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Before;
+import org.junit.Ignore;
+
+import com.google.gson.JsonObject;
+
+@Ignore("Abstract class proiding generic topology testing.")
+public abstract class TopologyAbstractTest implements TopologyTestSetup {
+
+    private TopologyProvider topologyProvider;
+    private Submitter<Topology, Job> submitter;
+
+    @Before
+    public void setup() {
+        topologyProvider = createTopologyProvider();
+        assertNotNull(topologyProvider);
+
+        submitter = createSubmitter();
+        assertNotNull(submitter);
+    }
+
+    protected Submitter<Topology, ?> getSubmitter() {
+        return submitter;
+    }
+
+    @Override
+    public TopologyProvider getTopologyProvider() {
+        return topologyProvider;
+    }
+
+    protected Topology newTopology() {
+        return getTopologyProvider().newTopology();
+    }
+
+    protected Topology newTopology(String name) {
+        return getTopologyProvider().newTopology(name);
+    }
+
+    protected boolean complete(Topology topology, Condition<?> endCondition, long timeout, TimeUnit units) throws Exception {
+        return topology.getTester().complete(getSubmitter(), new JsonObject(), endCondition, timeout, units);
+    }
+
+    protected boolean complete(Topology topology, Condition<?> endCondition) throws Exception {
+        return complete(topology, endCondition, 10, TimeUnit.SECONDS);
+    }
+    
+    public void completeAndValidate(String msg, Topology t,
+            TStream<String> s, int secTimeout, String... expected)
+            throws Exception {
+        completeAndValidate(true/*ordered*/, msg, t, s, secTimeout, expected);
+    }
+    
+    public void completeAndValidate(boolean ordered, String msg, Topology t,
+            TStream<String> s, int secTimeout, String... expected)
+            throws Exception {
+
+        // if expected.length==0 we must run until the job completes or tmo
+        Condition<Long> tc = t.getTester().tupleCount(s, 
+                expected.length == 0 ? Long.MAX_VALUE : expected.length);
+        Condition<List<String>> contents = 
+                ordered ? t.getTester().streamContents(s, expected)
+                        : t.getTester().contentsUnordered(s, expected);
+
+        complete(t, tc, secTimeout, TimeUnit.SECONDS);
+
+        assertTrue(msg + " contents:" + contents.getResult(), contents.valid());
+        if (expected.length != 0)
+            assertTrue("valid:" + tc.getResult(), tc.valid());
+    }
+
+    protected void assertStream(Topology t, TStream<?> s) {
+        assertSame(t, s.topology());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyElementTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyElementTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyElementTest.java
new file mode 100644
index 0000000..b66750c
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyElementTest.java
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyElement;
+import org.junit.Test;
+
+public class TopologyElementTest {
+
+    @Test
+    public void testHierachy() {
+        assertTrue(TopologyElement.class.isAssignableFrom(Topology.class));
+        assertTrue(TopologyElement.class.isAssignableFrom(TStream.class));
+        assertTrue(TopologyElement.class.isAssignableFrom(TSink.class));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTest.java
new file mode 100644
index 0000000..3fa3804
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTest.java
@@ -0,0 +1,160 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.mbeans.PeriodMXBean;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.execution.services.RuntimeServices;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("abstract, provides common tests for concrete implementations")
+public abstract class TopologyTest extends TopologyAbstractTest {
+
+    @Test
+    public void testBasics() {
+        final Topology t = newTopology("T123");
+        assertEquals("T123", t.getName());
+        assertSame(t, t.topology());
+    }
+
+    @Test
+    public void testDefaultName() {
+        final Topology t = newTopology();
+        assertSame(t, t.topology());
+        assertNotNull(t.getName());
+    }
+
+    @Test
+    public void testStringContants() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("a", "b", "c");
+        assertStream(t, s);
+
+        Condition<Long> tc = t.getTester().tupleCount(s, 3);
+        Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
+        complete(t, tc);
+        assertTrue(contents.valid());
+    }
+
+    @Test
+    public void testNoStringContants() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings();
+
+        Condition<Long> tc = t.getTester().tupleCount(s, 0);
+
+        complete(t, tc);
+        
+        assertTrue(tc.valid());
+    }
+    
+    @Test
+    public void testRuntimeServices() throws Exception {
+        Topology t = newTopology();
+        TStream<String> s = t.strings("A");
+        
+        Supplier<RuntimeServices> serviceGetter =
+                t.getRuntimeServiceSupplier();
+        
+        TStream<Boolean> b = s.map(tuple -> 
+            serviceGetter.get().getService(ThreadFactory.class) != null
+            && serviceGetter.get().getService(ScheduledExecutorService.class) != null
+        );
+        
+        Condition<List<Boolean>> tc = t.getTester().streamContents(b, Boolean.TRUE);
+        complete(t, tc);
+        
+        assertTrue(tc.valid());
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAdaptablePoll() throws Exception {
+        // Ensure the API supporting an adaptable poll() is functional.
+        Job job = null;
+        try {
+            Topology t = newTopology();
+            TStream<String> s = t.poll(() -> (new Date()).toString(),
+                    1, TimeUnit.HOURS)
+                    .alias("myAlias");
+            
+            AtomicInteger cnt = new AtomicInteger();
+            s.peek(tuple -> cnt.incrementAndGet()); 
+            
+            Future<Job> jf = (Future<Job>) getSubmitter().submit(t);
+            job = jf.get();
+            assertEquals(Job.State.RUNNING, job.getCurrentState());
+            
+            setPollFrequency(s, 100, TimeUnit.MILLISECONDS);
+            cnt.set(0);
+            Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+            int curCnt = cnt.get();
+            assertTrue("curCnt="+curCnt, curCnt >= 20);
+            
+            setPollFrequency(s, 1, TimeUnit.SECONDS);
+            cnt.set(0);
+            Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+            curCnt = cnt.get();
+            assertTrue("curCnt="+curCnt, curCnt >= 2 && curCnt <= 4);
+            
+            setPollFrequency(s, 100, TimeUnit.MILLISECONDS);
+            cnt.set(0);
+            Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+            curCnt = cnt.get();
+            assertTrue("curCnt="+curCnt, curCnt >= 20);
+        }
+        finally {
+            if (job != null)
+                job.stateChange(Job.Action.CLOSE);
+        }
+        
+    }
+
+    static <T> void setPollFrequency(TStream<T> pollStream, long period, TimeUnit unit) {
+        ControlService cs = pollStream.topology().getRuntimeServiceSupplier()
+                                    .get().getService(ControlService.class);
+        PeriodMXBean control = cs.getControl(TStream.TYPE,
+                                  pollStream.getAlias(), PeriodMXBean.class);
+        control.setPeriod(period, unit);
+    }
+
+}