You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:39 UTC

[38/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/PlumbingTest.java b/api/topology/src/test/java/edgent/test/topology/PlumbingTest.java
deleted file mode 100644
index cf03668..0000000
--- a/api/topology/src/test/java/edgent/test/topology/PlumbingTest.java
+++ /dev/null
@@ -1,747 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
-import edgent.function.BiFunction;
-import edgent.function.Function;
-import edgent.function.Functions;
-import edgent.function.ToIntFunction;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-import edgent.topology.plumbing.Valve;
-import edgent.topology.tester.Condition;
-
-@Ignore
-public abstract class PlumbingTest extends TopologyAbstractTest {
-	
-
-	@Test
-    public void testBlockingDelay() throws Exception {
-		// Timing variances on shared machines can cause this test to fail
-		assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
-        Topology topology = newTopology();
-        
-        TStream<String> strings = topology.strings("a", "b", "c", "d");
-        
-        TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
-        
-        // delay stream
-        starts = PlumbingStreams.blockingDelay(starts, 300, TimeUnit.MILLISECONDS);
-        
-        // calculate display
-        starts = starts.modify(v -> System.currentTimeMillis() - v);
-        
-        starts = starts.filter(v -> v >= 300);
-        
-        Condition<Long> tc = topology.getTester().tupleCount(starts, 4);
-        complete(topology, tc);
-        assertTrue("valid:" + tc.getResult(), tc.valid());
-    }
-
-    @Test
-    public void testBlockingThrottle() throws Exception {
-		// Timing variances on shared machines can cause this test to fail
-    	assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
-        Topology topology = newTopology();
-        
-        TStream<String> strings = topology.strings("a", "b", "c", "d");
-
-        TStream<Long> emittedDelays = strings.map(v -> 0L);
-        
-        // throttle stream
-        long[] lastEmittedTimestamp = { 0 };
-        emittedDelays = PlumbingStreams.blockingThrottle(emittedDelays, 300, TimeUnit.MILLISECONDS)
-                .map(t -> {
-                    // compute the delay since the last emitted tuple
-                    long now = System.currentTimeMillis();
-                    if (lastEmittedTimestamp[0] == 0)
-                        lastEmittedTimestamp[0] = now;
-                    t = now - lastEmittedTimestamp[0];
-                    lastEmittedTimestamp[0] = now;
-                    // System.out.println("### "+t);
-                    return t;
-                    })
-                .map(t -> {
-                    // simulate 200ms downstream processing delay
-                    try {
-                        Thread.sleep(TimeUnit.MILLISECONDS.toMillis(200));
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw new RuntimeException(e);
-                    } return t;
-                    }) ;
-
-        // should end up with throttled delays close to 300 (not 500 like
-        // a blockingDelay() under these same conditions would yield)
-        emittedDelays = emittedDelays.filter(v -> v <= 320);
-        
-        Condition<Long> tc = topology.getTester().tupleCount(emittedDelays, 4);
-        complete(topology, tc);
-        assertTrue("valid:" + tc.getResult(), tc.valid());
-    }
-
-    @Test
-    public void testOneShotDelay() throws Exception {
-
-        Topology topology = newTopology();
-        
-        TStream<String> strings = topology.strings("a", "b", "c", "d");
-        
-        TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
-        
-        // delay stream
-        starts = PlumbingStreams.blockingOneShotDelay(starts, 300, TimeUnit.MILLISECONDS);
-        
-        // calculate display
-        starts = starts.modify(v -> System.currentTimeMillis() - v);
-        
-        // the first tuple shouldn't satisfy the predicate
-        starts = starts.filter(v -> v < 300);
-        
-        Condition<Long> tc = topology.getTester().tupleCount(starts, 3);
-        complete(topology, tc);
-        assertTrue("valid:" + tc.getResult(), tc.valid());
-    }
-
-    public static class TimeAndId {
-    	private static AtomicInteger ids = new AtomicInteger();
-    	long ms;
-    	final int id;
-    	
-    	public TimeAndId() {
-    		this.ms = System.currentTimeMillis();
-    		this.id = ids.incrementAndGet();
-    	}
-    	public TimeAndId(TimeAndId tai) {
-    		this.ms = System.currentTimeMillis() - tai.ms;
-    		this.id = tai.id;
-    	}
-    	@Override
-    	public String toString() {
-    		return "TAI:" + id + "@" + ms;
-    	}
-    	
-    }
-    
-    @Test
-    public void testPressureReliever() throws Exception {
-		// Timing variances on shared machines can cause this test to fail
-		assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
-        Topology topology = newTopology();
-        
-        TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);
-           
-        
-        TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 5);
-        
-        // insert a blocking delay acting as downstream operator that cannot keep up
-        TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS);
-        
-        // calculate the delay
-        TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));
-        
-        // Also process raw that should be unaffected by the slow path
-        TStream<String> processed = raw.asString();
-        
-        
-        Condition<Long> tcSlowCount = topology.getTester().atLeastTupleCount(slow, 20);
-        Condition<List<TimeAndId>> tcRaw = topology.getTester().streamContents(raw);
-        Condition<List<TimeAndId>> tcSlow = topology.getTester().streamContents(slow);
-        Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM);
-        Condition<List<String>> tcProcessed = topology.getTester().streamContents(processed);
-        complete(topology, tcSlowCount);
-        
-        assertTrue(tcProcessed.getResult().size() > tcSlowM.getResult().size());
-        for (TimeAndId delay : tcSlowM.getResult())
-            assertTrue(delay.ms < 300);
-
-        // Must not lose any tuples in the non relieving path
-        Set<TimeAndId> uniq = new HashSet<>(tcRaw.getResult());
-        assertEquals(tcRaw.getResult().size(), uniq.size());
-
-        // Must not lose any tuples in the non relieving path
-        Set<String> uniqProcessed = new HashSet<>(tcProcessed.getResult());
-        assertEquals(tcProcessed.getResult().size(), uniqProcessed.size());
-        
-        assertEquals(uniq.size(), uniqProcessed.size());
-           
-        // Might lose tuples, but must not have send duplicates
-        uniq = new HashSet<>(tcSlow.getResult());
-        assertEquals(tcSlow.getResult().size(), uniq.size());
-    }
-    
-    @Test
-    public void testPressureRelieverWithInitialDelay() throws Exception {
-
-        Topology topology = newTopology();
-        
-        
-        TStream<String> raw = topology.strings("A", "B", "C", "D", "E", "F", "G", "H");
-        
-        TStream<String> pr = PlumbingStreams.pressureReliever(raw, v -> 0, 100);
-        
-        TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 5, TimeUnit.SECONDS);
-        
-        Condition<Long> tcCount = topology.getTester().tupleCount(pr2, 8);
-        Condition<List<String>> contents = topology.getTester().streamContents(pr2, "A", "B", "C", "D", "E", "F", "G", "H");
-        complete(topology, tcCount);
-        
-        assertTrue(tcCount.valid());
-        assertTrue(contents.valid());
-    }
-    
-    @Test
-    public void testValveState() throws Exception {
-        Valve<Integer> valve = new Valve<>();
-        assertTrue(valve.isOpen());
-        
-        valve = new Valve<>(true);
-        assertTrue(valve.isOpen());
-        
-        valve = new Valve<>(false);
-        assertFalse(valve.isOpen());
-        
-        valve.setOpen(true);
-        assertTrue(valve.isOpen());
-        
-        valve.setOpen(false);
-        assertFalse(valve.isOpen());
-    }
-    
-    @Test
-    public void testValveInitiallyOpen() throws Exception {
-        Topology top = newTopology("testValve");
-
-        TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-        
-        Valve<Integer> valve = new Valve<>();
-        AtomicInteger cnt = new AtomicInteger();
-        TStream<Integer> filtered = values
-                                    .peek(tuple -> {
-                                        // reject 4,5,6
-                                        int curCnt = cnt.incrementAndGet();
-                                        if (curCnt > 6)
-                                            valve.setOpen(true);
-                                        else if (curCnt > 3)
-                                            valve.setOpen(false);
-                                        })
-                                    .filter(valve);
-
-        Condition<Long> count = top.getTester().tupleCount(filtered, 7);
-        Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 1,2,3,7,8,9,10 );
-        complete(top, count);
-        assertTrue(contents.getResult().toString(), contents.valid());
-    }
-    
-    @Test
-    public void testValveInitiallyClosed() throws Exception {
-        Topology top = newTopology("testValve");
-        
-        TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-        
-        Valve<Integer> valve = new Valve<>(false);
-        
-        AtomicInteger cnt = new AtomicInteger();
-        TStream<Integer> filtered = values
-                                    .peek(tuple -> {
-                                        // reject all but 4,5,6
-                                        int curCnt = cnt.incrementAndGet();
-                                        if (curCnt > 6)
-                                            valve.setOpen(false);
-                                        else if (curCnt > 3)
-                                            valve.setOpen(true);
-                                        })
-                                    .filter(valve);
-
-        Condition<Long> count = top.getTester().tupleCount(filtered, 3);
-        Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 4,5,6 );
-        complete(top, count);
-        assertTrue(contents.getResult().toString(), contents.valid());
-    }
-    
-    private Function<Integer,JsonObject> fakeAnalytic(int channel, long period, TimeUnit unit) {
-      return value -> { 
-        try {
-          Thread.sleep(unit.toMillis(period));
-          JsonObject jo = new JsonObject();
-          jo.addProperty("channel", channel);
-          jo.addProperty("result", value);
-          return jo;
-        } catch (InterruptedException e) {
-          throw new RuntimeException("channel="+channel+" interrupted", e);
-        }
-      };
-    }
-
-    private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
-      return stream -> stream.map(fakeAnalytic(channel, period, unit)).filter(t->true).tag("pipeline-ch"+channel);
-    }
-    
-    @Test
-    public void testConcurrentMap() throws Exception {
-        Topology top = newTopology("testConcurrentMap");
-        
-        int ch = 0;
-        List<Function<Integer,JsonObject>> mappers = new ArrayList<>();
-        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
-        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
-        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
-        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
-        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
-        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
-        // a couple much faster just in case something's amiss with queues
-        mappers.add(fakeAnalytic(ch++, 3, TimeUnit.MILLISECONDS));
-        mappers.add(fakeAnalytic(ch++, 13, TimeUnit.MILLISECONDS));
-        
-        Function<List<JsonObject>,Integer> combiner = list -> {
-            int sum = 0;
-            int cnt = 0;
-            System.out.println("combiner: "+list);
-            for(JsonObject jo : list) {
-              assertEquals(cnt++, jo.get("channel").getAsInt());
-              sum += jo.get("result").getAsInt();
-            }
-            return sum;
-        };
-
-        TStream<Integer> values = top.of(1, 2, 3);
-        Integer[] resultTuples = new Integer[]{
-            1*mappers.size(),
-            2*mappers.size(),
-            3*mappers.size(),
-        };
-        
-        TStream<Integer> result = PlumbingStreams.concurrentMap(values, mappers, combiner);
-        
-        Condition<Long> count = top.getTester().tupleCount(result, 3);
-        Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
-
-        long begin = System.currentTimeMillis();
-        complete(top, count);
-        long end = System.currentTimeMillis();
-
-        assertTrue(contents.getResult().toString(), contents.valid());
-        
-        long actDuration = end - begin;
-        long expMinSerialDuration = resultTuples.length * mappers.size() * 100;
-        long expMinDuration = resultTuples.length * 100;
-        
-        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-        
-        // a gross level performance check w/concurrent channels
-        if (Boolean.getBoolean("edgent.build.ci"))
-          System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
-        else
-          assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
-              actDuration < 0.5 * expMinSerialDuration);
-    }
-    
-    @Test
-    public void testConcurrent() throws Exception {
-        Topology top = newTopology("testConcurrent");
-        
-        int ch = 0;
-        List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>();
-        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
-        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
-        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
-        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
-        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
-        
-        Function<List<JsonObject>,Integer> combiner = list -> {
-            int sum = 0;
-            int cnt = 0;
-            System.out.println("combiner: "+list);
-            for(JsonObject jo : list) {
-              assertEquals(cnt++, jo.get("channel").getAsInt());
-              sum += jo.get("result").getAsInt();
-            }
-            return sum;
-        };
-        
-        TStream<Integer> values = top.of(1, 2, 3);
-        Integer[] resultTuples = new Integer[]{
-            1*pipelines.size(),
-            2*pipelines.size(),
-            3*pipelines.size(),
-        };
-        
-        TStream<Integer> result = PlumbingStreams.concurrent(values, pipelines, combiner).tag("result");
-        
-        Condition<Long> count = top.getTester().tupleCount(result, 3);
-        Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
-
-        long begin = System.currentTimeMillis();
-        complete(top, count);
-        long end = System.currentTimeMillis();
-        
-        assertTrue(contents.getResult().toString(), contents.valid());
-        
-        long actDuration = end - begin;
-        long expMinSerialDuration = resultTuples.length * pipelines.size() * 100;
-        long expMinDuration = resultTuples.length * 100;
-        
-        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-        
-        // a gross level performance check w/concurrent channels
-        if (Boolean.getBoolean("edgent.build.ci"))
-          System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
-        else
-          assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
-              actDuration < 0.5 * expMinSerialDuration);
-    }
-
-    private BiFunction<Integer,Integer,JsonObject> fakeParallelAnalytic(long period, TimeUnit unit) {
-      return (value,channel) -> { 
-        try {
-          Thread.sleep(unit.toMillis(period));  // simulate work for this period
-          JsonObject jo = new JsonObject();
-          jo.addProperty("channel", channel);
-          jo.addProperty("result", value);
-          return jo;
-        } catch (InterruptedException e) {
-          throw new RuntimeException("channel="+channel+" interrupted", e);
-        }
-      };
-    }
-    
-    private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
-      return (stream,channel) -> stream
-          .map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))
-          .filter(t->true)
-          .tag("pipeline-ch"+channel);
-    }
-    
-    private Function<JsonObject,JsonObject> fakeJsonAnalytic(int channel, long period, TimeUnit unit) {
-      return jo -> { 
-        try {
-          Thread.sleep(unit.toMillis(period));  // simulate work for this period
-          return jo;
-        } catch (InterruptedException e) {
-          throw new RuntimeException("channel="+channel+" interrupted", e);
-        }
-      };
-    }
-    
-    @SuppressWarnings("unused")
-    private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
-      return (stream,channel) -> stream
-          .map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
-                       return jo; })
-          .map(fakeJsonAnalytic(channel, period, unit))
-          .filter(t->true)
-          .map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
-                      return jo; })
-          .tag("pipeline-ch"+channel);
-    }
-    
-    @Test
-    public void testParallelMap() throws Exception {
-        Topology top = newTopology("testParallelMap");
-        
-        BiFunction<Integer,Integer,JsonObject> mapper = 
-            fakeParallelAnalytic(100, TimeUnit.MILLISECONDS);
-        
-        int width = 5;
-        ToIntFunction<Integer> splitter = tuple -> tuple % width;
-        
-        Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
-        TStream<Integer> values = top.of(resultTuples);
-        
-        TStream<JsonObject> result = PlumbingStreams.parallelMap(values, width, splitter, mapper).tag("result");
-        TStream<Integer> result2 = result.map(jo -> {
-            int r = jo.get("result").getAsInt();
-            assertEquals(splitter.applyAsInt(r), jo.get("channel").getAsInt());
-            return r;
-          });
-        
-        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
-        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
-    
-        long begin = System.currentTimeMillis();
-        complete(top, count);
-        long end = System.currentTimeMillis();
-        
-        assertTrue(contents.getResult().toString(), contents.valid());
-        
-        long actDuration = end - begin;
-        long expMinSerialDuration = resultTuples.length * 100;
-        long expMinDuration = (resultTuples.length / width) * 100;
-        
-        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-        
-        // a gross level performance check w/parallel channels
-        if (Boolean.getBoolean("edgent.build.ci"))
-          System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
-        else
-          assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
-              actDuration < 0.5 * expMinSerialDuration);
-    }
-    
-    @Test
-    public void testParallel() throws Exception {
-        Topology top = newTopology("testParallel");
-        
-        BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline = 
-            fakeParallelPipeline(100, TimeUnit.MILLISECONDS);
-        
-        int width = 5;
-        ToIntFunction<Integer> splitter = tuple -> tuple % width;
-        
-        Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
-        TStream<Integer> values = top.of(resultTuples);
-        
-        TStream<JsonObject> result = PlumbingStreams.parallel(values, width, splitter, pipeline).tag("result");
-        TStream<Integer> result2 = result.map(jo -> {
-            int r = jo.get("result").getAsInt();
-            assertEquals(splitter.applyAsInt(r), jo.get("channel").getAsInt());
-            return r;
-          });
-        
-        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
-        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
-        
-        long begin = System.currentTimeMillis();
-        complete(top, count);
-        long end = System.currentTimeMillis();
-        
-        assertTrue(contents.getResult().toString(), contents.valid());
-        
-        long actDuration = end - begin;
-        long expMinSerialDuration = resultTuples.length * 100;
-        long expMinDuration = (resultTuples.length / width) * 100;
-        
-        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-        
-        // a gross level performance check w/parallel channels
-        if (Boolean.getBoolean("edgent.build.ci"))
-          System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
-        else
-          assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
-              actDuration < 0.5 * expMinSerialDuration);
-    }
-    
-    @Test
-    public void testParallelBalanced() throws Exception {
-        // May need tweak validation sensitivity or add this:
-        // Timing variances on shared machines can cause this test to fail
-        // assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
-        Topology top = newTopology("testParallelBalanced");
-        
-        // arrange for even channels to process ~2x as many as odd channels.
-        BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline =
-            (stream,ch) -> {
-              long delay = (ch % 2 == 0) ? 10 : 20;
-              return stream.map(fakeAnalytic(ch, delay, TimeUnit.MILLISECONDS));
-            };
-        
-        int width = 4;
-        int tupCnt = 60;
-        Integer[] resultTuples = new Integer[tupCnt];
-        for (int i = 0; i < tupCnt; i++)
-          resultTuples[i] = i;
-        AtomicInteger[] chCounts = new AtomicInteger[width];
-        for (int ch = 0; ch < width; ch++)
-          chCounts[ch] = new AtomicInteger();
-        
-        TStream<Integer> values = top.of(resultTuples);
-        
-        TStream<JsonObject> result = PlumbingStreams.parallelBalanced(values, width, pipeline).tag("result");
-        TStream<Integer> result2 = result.map(jo -> {
-            int r = jo.get("result").getAsInt();
-            int ch = jo.get("channel").getAsInt();
-            chCounts[ch].incrementAndGet();
-            return r;
-          });
-        
-        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
-        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
-        
-        long begin = System.currentTimeMillis();
-        complete(top, count);
-        long end = System.currentTimeMillis();
-        
-        assertTrue(contents.getResult().toString(), contents.valid());
-        
-        long actDuration = end - begin;
-        long expMinSerialDuration = resultTuples.length * 20;
-        long expMinDuration = (resultTuples.length / width) * 20;
-        
-        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-        System.out.println(top.getName()+" chCounts="+Arrays.asList(chCounts));
-        
-        // a gross level performance check w/parallel channels
-        if (Boolean.getBoolean("edgent.build.ci"))
-          System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
-        else
-          assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
-              actDuration < 0.5 * expMinSerialDuration);
-        
-        int evenChCounts = 0;
-        int oddChCounts = 0;
-        for (int ch = 0; ch < width; ch++) {
-          assertTrue(chCounts[ch].get() != 0);
-          if (ch % 2 == 0)
-            evenChCounts += chCounts[ch].get();
-          else
-            oddChCounts += chCounts[ch].get();
-        }
-        assertTrue(oddChCounts > 0.4 * evenChCounts
-            && oddChCounts < 0.6 * evenChCounts);
-    }
-    
-//    @Test
-//    public void testParallelTiming() throws Exception {
-//        Topology top = newTopology("testParallelTiming");
-//        
-//        BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> pipeline = 
-//            fakeParallelPipelineTiming(100, TimeUnit.MILLISECONDS);
-//        
-//        int width = 5;
-//        // ToIntFunction<Integer> splitter = tuple -> tuple % width;
-//        ToIntFunction<JsonObject> splitter = jo -> jo.get("result").getAsInt() % width;
-//        
-//        Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
-//        TStream<Integer> values = top.of(resultTuples);
-//        
-//        TStream<JsonObject> inStream = values.map(value -> {
-//            JsonObject jo = new JsonObject();
-//            jo.addProperty("result", value);
-//            jo.addProperty("channel", splitter.applyAsInt(jo));
-//            jo.addProperty("enterParallelMsec", System.currentTimeMillis());
-//            return jo;
-//          });
-//        TStream<JsonObject> result = PlumbingStreams.parallel(inStream, width, splitter, pipeline).tag("result");
-//        TStream<Integer> result2 = result.map(jo -> {
-//            jo.addProperty("exitParallelMsec", System.currentTimeMillis());
-//            System.out.println("ch="+jo.get("channel").getAsInt()
-//                +" endPipeline-startPipeline="
-//                  +(jo.get("endPipelineMsec").getAsLong()
-//                    - jo.get("startPipelineMsec").getAsLong())
-//                +" exitParallel-startPipeine="
-//                  +(jo.get("exitParallelMsec").getAsLong()
-//                      - jo.get("startPipelineMsec").getAsLong()));
-//            int r = jo.get("result").getAsInt();
-//            assertEquals(splitter.applyAsInt(jo), jo.get("channel").getAsInt());
-//            return r;
-//          });
-//        
-//        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
-//        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
-//        long begin = System.currentTimeMillis();
-//        complete(top, count);
-//        long end = System.currentTimeMillis();
-//        assertTrue(contents.getResult().toString(), contents.valid());
-//        
-//        long actDuration = end - begin;
-//        
-//        long expMinSerialDuration = resultTuples.length * 100;
-//        long expMinDuration = (resultTuples.length / width) * 100;
-//        
-//        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-//        
-//        // a gross level performance check w/parallel channels
-//        assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
-//            actDuration < 0.5 * expMinSerialDuration);
-//    }
-
-    @Test
-    public void testGate() throws Exception {
-        Topology topology = newTopology("testGate");
-
-        TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
-
-        Semaphore semaphore = new Semaphore(1);
-        raw = PlumbingStreams.gate(raw, semaphore);
-
-        ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
-        ArrayList<Integer> arrayResult = new ArrayList<>();
-        for (int i = 0; i < 5; i++) {
-            arrayResult.add(0);
-            arrayResult.add(1);
-        }
-
-        raw.sink(t -> {
-            //Add 0 to list because semaphore.acquire() in sync has occurred
-            resultAvailablePermits.add(semaphore.availablePermits());
-            semaphore.release();
-            //Add 1 to list because semaphore.release() has executed
-            resultAvailablePermits.add(semaphore.availablePermits());
-        });
-
-        Condition<List<String>> contents = topology.getTester()
-            .streamContents(raw, "a", "b", "c", "d", "e");
-        complete(topology, contents);
-
-        assertTrue("valid:" + contents.getResult(), contents.valid());
-        assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
-    }
-
-    @Test
-    public void testGateWithLocking() throws Exception {
-        Topology topology = newTopology("testGateWithLocking");
-
-        TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
-
-        Semaphore semaphore = new Semaphore(3);
-        raw = PlumbingStreams.gate(raw, semaphore);
-
-        ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
-        ArrayList<Integer> arrayResult = new ArrayList<>();
-        arrayResult.add(2);
-        arrayResult.add(1);
-        arrayResult.add(0);
-
-        raw.sink(t -> {
-            //Add number of availablePermits
-            resultAvailablePermits.add(semaphore.availablePermits());
-        });
-
-        Condition<List<String>> contents = topology.getTester().streamContents(raw, "a", "b", "c");
-        complete(topology, contents, 1000, TimeUnit.MILLISECONDS);
-
-        assertTrue("valid:" + contents.getResult(), contents.valid());
-        assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TStreamTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TStreamTest.java b/api/topology/src/test/java/edgent/test/topology/TStreamTest.java
deleted file mode 100644
index f24a322..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TStreamTest.java
+++ /dev/null
@@ -1,876 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.TWindow;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-@Ignore
-public abstract class TStreamTest extends TopologyAbstractTest {
-
-    @Test
-    public void testAlias() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("a", "b");
-        assertEquals(null, s.getAlias());
-        
-        TStream<String> s2 = s.alias("sAlias");
-        assertSame(s, s2);
-        assertEquals("sAlias", s.getAlias());
-        
-        try {
-            s.alias("another");  // expect ISE - alias already set
-            assertTrue(false);
-        } catch (IllegalStateException e) {
-            ; // expected
-        }
-        
-        // test access at runtime
-        s2 = s.peek(tuple -> {
-            assertEquals("sAlias", s.getAlias());
-        }).filter(tuple -> true);
-
-        // just verify that alias presence doesn't otherwise muck up things
-        Condition<Long> tc = t.getTester().tupleCount(s2, 2);
-        Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
-        complete(t, tc);
-
-        assertTrue("contents "+contents.getResult(), contents.valid());
-    }
-
-    @Test
-    public void testTag() throws Exception {
-
-        Topology t = newTopology();
-
-        List<String> tags = new ArrayList<>(Arrays.asList("tag1", "tag2"));
-        
-        TStream<String> s = t.strings("a", "b");
-        assertEquals(0, s.getTags().size());
-        
-        TStream<String> s2 = s.tag("tag1", "tag2");
-        assertSame(s, s2);
-        assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
-        
-        tags.add("tag3");
-        s.tag("tag3");
-        assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
-        
-        s.tag("tag3", "tag2", "tag1");  // ok to redundantly add
-        assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
-
-        // test access at runtime
-        s2 = s.peek(tuple -> {
-            assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
-        }).filter(tuple -> true);
-
-        // just verify that tag presence doesn't otherwise muck up things
-        Condition<Long> tc = t.getTester().tupleCount(s2, 2);
-        Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
-        complete(t, tc);
-
-        assertTrue("contents "+contents.getResult(), contents.valid());
-    }
-
-    @Test
-    public void testFilter() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("a", "b", "c");
-        s = s.filter(tuple -> "b".equals(tuple));
-        assertStream(t, s);
-
-        Condition<Long> tc = t.getTester().tupleCount(s, 1);
-        Condition<List<String>> contents = t.getTester().streamContents(s, "b");
-        complete(t, tc);
-
-        assertTrue(contents.valid());
-    }
-
-    /**
-     * Test Peek. This will only work with an embedded setup.
-     * 
-     * @throws Exception
-     */
-    @Test
-    public void testPeek() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("a", "b", "c");
-        List<String> peekedValues = new ArrayList<>();
-		TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple));
-		assertSame(s, speek);
-
-		Condition<Long> tc = t.getTester().tupleCount(s, 3);
-		Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
-        complete(t, tc);
-
-        assertTrue(contents.valid());
-        assertEquals(contents.getResult(), peekedValues);
-    }
-
-	@Test
-	public void testMultiplePeek() throws Exception {
-
-		Topology t = newTopology();
-
-		TStream<String> s = t.strings("a", "b", "c");
-		List<String> peekedValues = new ArrayList<>();
-		TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st"));
-		assertSame(s, speek);
-
-		TStream<String> speek2 = s.peek(tuple -> peekedValues.add(tuple + "2nd"));
-		assertSame(s, speek2);
-		TStream<String> speek3 = s.peek(tuple -> peekedValues.add(tuple + "3rd"));
-		assertSame(s, speek3);
-
-		Condition<Long> tc = t.getTester().tupleCount(s, 3);
-		Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
-		complete(t, tc);
-
-		assertTrue(contents.valid());
-        List<String> expected = Arrays.asList("a1st", "a2nd", "a3rd", "b1st", "b2nd", "b3rd", "c1st", "c2nd", "c3rd");
-		assertEquals(expected, peekedValues);
-	}
-
-    @Test
-    public void testMap() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("32", "423", "-746");
-        TStream<Integer> i = s.map(Integer::valueOf);
-        assertStream(t, i);
-
-        Condition<Long> tc = t.getTester().tupleCount(i, 3);
-        Condition<List<Integer>> contents = t.getTester().streamContents(i, 32, 423, -746);
-        complete(t, tc);
-
-        assertTrue(contents.getResult().toString(), contents.valid());
-    }
-    
-    @Test
-    public void testModifyWithDrops() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("32", "423", "-746");
-        TStream<Integer> i = s.map(Integer::valueOf);
-        i = i.modify(tuple -> tuple < 0 ? null : tuple + 27);
-        assertStream(t, i);
-
-        Condition<Long> tc = t.getTester().tupleCount(i, 2);
-        Condition<List<Integer>> contents = t.getTester().streamContents(i, 59, 450);
-        complete(t, tc);
-
-        assertTrue(contents.getResult().toString(), contents.valid());
-    }
-
-    @Test
-    public void testModify() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("a", "b", "c");
-        TStream<String> i = s.modify(tuple -> tuple.concat("M"));
-        assertStream(t, i);
-
-        Condition<Long> tc = t.getTester().tupleCount(i, 3);
-        Condition<List<String>> contents = t.getTester().streamContents(i, "aM", "bM", "cM");
-        complete(t, tc);
-
-        assertTrue(contents.valid());
-    }
-    
-    @Test
-    public void tesFlattMap() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("mary had a little lamb",
-                "its fleece was white as snow");
-        TStream<String> w = s.flatMap(tuple->Arrays.asList(tuple.split(" ")));
-        assertStream(t, w);
-
-        Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
-                "a", "little", "lamb", "its", "fleece", "was", "white", "as",
-                "snow");
-        complete(t, contents);
-
-        assertTrue(contents.getResult().toString(), contents.valid());
-    }
-    
-    @Test
-    public void tesFlattMapWithNullIterator() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("mary had a little lamb", "NOTUPLES",
-                "its fleece was white as snow");
-        TStream<String> w = s.flatMap(tuple->tuple.equals("NOTUPLES") ? null : Arrays.asList(tuple.split(" ")));
-        assertStream(t, w);
-
-        Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
-                "a", "little", "lamb", "its", "fleece", "was", "white", "as",
-                "snow");
-        complete(t, contents);
-
-        assertTrue(contents.getResult().toString(), contents.valid());
-    }
-    
-    @Test
-    public void tesFlattMapWithNullValues() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("mary had a little lamb",
-                "its fleece was white as snow");
-        TStream<String> w = s.flatMap(tuple-> {List<String> values = Arrays.asList(tuple.split(" "));
-          values.set(2, null); values.set(4, null); return values;});
-        assertStream(t, w);
-
-        Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
-                "little", "its", "fleece",  "white",
-                "snow");
-        complete(t, contents);
-
-        assertTrue(contents.getResult().toString(), contents.valid());
-    }
-
-    /**
-     * Test split() with no drops.
-     * @throws Exception on failure
-     */
-    @Test
-    public void testSplit() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("a1", "b1", "a2", "c1", "e1", "c2", "c3", "b2", "a3", "b3", "d1", "e2");
-        List<TStream<String>> splits = s.split(3, tuple -> tuple.charAt(0) - 'a');
-
-        Condition<Long> tc0 = t.getTester().tupleCount(splits.get(0), 4);
-        Condition<Long> tc1 = t.getTester().tupleCount(splits.get(1), 5);
-        Condition<Long> tc2 = t.getTester().tupleCount(splits.get(2), 3);
-        Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(0), "a1", "a2", "a3", "d1");
-        Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(1), "b1", "e1", "b2", "b3", "e2");
-        Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(2), "c1", "c2", "c3");
-
-        complete(t, t.getTester().and(tc0, tc1, tc2));
-
-        assertTrue(contents0.toString(), contents0.valid());
-        assertTrue(contents1.toString(), contents1.valid());
-        assertTrue(contents2.toString(), contents2.valid());
-    }
-
-    /**
-     * Test split() with drops.
-     * @throws Exception on failure
-     */
-    @Test
-    public void testSplitWithDrops() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("a1", "b1", "a2", "c1", "e1", "c2", "c3", "b2", "a3", "b3", "d1", "e2");
-        List<TStream<String>> splits = s.split(3, tuple -> {
-            switch (tuple.charAt(0)) {
-            case 'a':
-                return 1;
-            case 'b':
-                return 4;
-            case 'c':
-                return 8;
-            case 'd':
-                return -34;
-            case 'e':
-                return -1;
-            default:
-                return -1;
-            }
-        });
-
-        Condition<Long> tc0 = t.getTester().tupleCount(splits.get(0), 0);
-        Condition<Long> tc1 = t.getTester().tupleCount(splits.get(1), 6);
-        Condition<Long> tc2 = t.getTester().tupleCount(splits.get(2), 3);
-        Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(0));
-        Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(1), "a1", "b1", "a2", "b2", "a3",
-                "b3");
-        Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(2), "c1", "c2", "c3");
-
-        complete(t, t.getTester().and(tc0, tc1, tc2));
-
-        assertTrue(contents0.toString(), contents0.valid());
-        assertTrue(contents1.toString(), contents1.valid());
-        assertTrue(contents2.toString(), contents2.valid());
-    }
-
-    /**
-     * Test split() zero outputs
-     * @throws Exception on failure
-     */
-    @Test(expected = IllegalArgumentException.class)
-    public void testSplitWithZeroOutputs() throws Exception {
-        newTopology().strings("a1").split(0, tuple -> 0);
-    }
-
-    /**
-     * Test split() negative outputs
-     * @throws Exception on failure
-     */
-    @Test(expected = IllegalArgumentException.class)
-    public void testSplitWithNegativeOutputs() throws Exception {
-        newTopology().strings("a1").split(-28, tuple -> 0);
-    }
-
-    /**
-     * Test enum data structure
-     */
-    private enum LogSeverityEnum {
-        ALERT(1), CRITICAL(2), ERROR(3), WARNING(4), NOTICE(5), INFO(6), DEBUG(7);
-
-        @SuppressWarnings("unused")
-        private final int code;
-
-        LogSeverityEnum(final int code) {
-            this.code = code;
-        }
-    }
-
-    /**
-     * Test split(enum) with integer type enum.
-     * @throws Exception on failure
-     */
-    @Test
-    public void testSplitWithEnum() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("Log1_ALERT", "Log2_INFO", "Log3_INFO", "Log4_INFO", "Log5_ERROR", "Log6_ERROR", "Log7_CRITICAL");
-        TStream<String> i = s.map(String::toString);
-        EnumMap<LogSeverityEnum,TStream<String>> splits = i.split(LogSeverityEnum.class, e -> LogSeverityEnum.valueOf(e.split("_")[1]));
-
-        assertStream(t, i);
-
-        Condition<Long> tc0 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ALERT), 1);
-        Condition<Long> tc1 = t.getTester().tupleCount(splits.get(LogSeverityEnum.INFO), 3);
-        Condition<Long> tc2 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ERROR), 2);
-        Condition<Long> tc3 = t.getTester().tupleCount(splits.get(LogSeverityEnum.CRITICAL), 1);
-        Condition<Long> tc4 = t.getTester().tupleCount(splits.get(LogSeverityEnum.WARNING), 0);
-
-        Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(LogSeverityEnum.ALERT), "Log1_ALERT");
-        Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(LogSeverityEnum.INFO), "Log2_INFO",
-            "Log3_INFO", "Log4_INFO");
-        Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(LogSeverityEnum.ERROR), "Log5_ERROR",
-            "Log6_ERROR");
-        Condition<List<String>> contents3 = t.getTester().streamContents(splits.get(LogSeverityEnum.CRITICAL), "Log7_CRITICAL");
-        Condition<List<String>> contents4 = t.getTester().streamContents(splits.get(LogSeverityEnum.WARNING));
-
-        complete(t, t.getTester().and(tc0, tc1, tc2, tc3, tc4));
-
-
-        assertTrue(contents0.toString(), contents0.valid());
-        assertTrue(contents1.toString(), contents1.valid());
-        assertTrue(contents2.toString(), contents2.valid());
-        assertTrue(contents3.toString(), contents3.valid());
-        assertTrue(contents4.toString(), contents4.valid());
-    }
-
-    private enum EnumClassWithZerosize {
-        ;
-    }
-
-    /**
-     * Test split(enum) with integer type enum.
-     * @throws Exception on failure
-     */
-    @Test(expected = IllegalArgumentException.class)
-    public void testSplitWithEnumForZeroSizeClass() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("Test");
-        s.split(EnumClassWithZerosize.class, e -> EnumClassWithZerosize.valueOf("Test"));
-    }
-
-    @Test
-    public void testFanout2() throws Exception {
-
-        Topology t = newTopology();
-        
-        TStream<String> s = t.strings("a", "b", "c");
-        TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
-        TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
-
-        Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
-        Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
-        Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cfo2");
-
-        complete(t, t.getTester().and(tsm, tsmc));
-
-        assertTrue(tsf.getResult().toString(), tsf.valid());
-        assertTrue(tsm.getResult().toString(), tsm.valid());
-    }
-    @Test
-    public void testFanout3() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("a", "b", "cde");
-        TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
-        TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
-        TStream<byte[]> st = s.map(tuple -> tuple.getBytes());
-
-        Condition<Long> tsfc = t.getTester().tupleCount(sf, 1);
-        Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
-        Condition<Long> tstc = t.getTester().tupleCount(st, 3);
-        Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
-        Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cdefo2");
-        Condition<List<byte[]>> tst = t.getTester().streamContents(st, "a".getBytes(), "b".getBytes(), "cde".getBytes());
-
-        complete(t, t.getTester().and(tsfc, tsmc, tstc));
-
-        assertTrue(tsf.valid());
-        assertTrue(tsm.valid());
-
-        // Can't use equals on byte[]
-        assertEquals(3, tst.getResult().size());
-        assertEquals("a", new String(tst.getResult().get(0)));
-        assertEquals("b", new String(tst.getResult().get(1)));
-        assertEquals("cde", new String(tst.getResult().get(2)));
-    }
-
-    @Test
-    public void testPeekThenFanout() throws Exception {
-        _testFanoutWithPeek(1, 0, 0);
-    }
-
-    @Test
-    public void testFanoutThenPeek() throws Exception {
-        _testFanoutWithPeek(0, 0, 1);
-    }
-
-    @Test
-    public void testPeekMiddleFanout() throws Exception {
-        _testFanoutWithPeek(0, 1, 0);
-    }
-
-    @Test
-    public void testMultiPeekFanout() throws Exception {
-        _testFanoutWithPeek(3, 3, 3);
-    }
-
-    void _testFanoutWithPeek(int numBefore, int numMiddle, int numAfter) throws Exception {
-
-        Topology t = newTopology();
-
-        List<Peeked> values = new ArrayList<>();
-        values.add(new Peeked(33));
-        values.add(new Peeked(-214));
-        values.add(new Peeked(9234));
-        for (Peeked p : values)
-            assertEquals(0, p.peekedCnt);
-
-        TStream<Peeked> s = t.collection(values);
-        if (numBefore > 0) {
-          for (int i = 0; i < numBefore; i++)
-            s.peek(tuple -> tuple.peekedCnt++);
-        }
-
-        TStream<Peeked> sf = s.filter(tuple -> tuple.value > 0);
-        if (numMiddle > 0) {
-          for (int i = 0; i < numMiddle; i++)
-            s.peek(tuple -> tuple.peekedCnt++);
-        }
-        TStream<Peeked> sm = s.modify(tuple -> new Peeked(tuple.value + 37, tuple.peekedCnt));
-
-        if (numAfter > 0) {
-          for (int i = 0; i < numAfter; i++)
-            s.peek(tuple -> tuple.peekedCnt++);
-        }
-
-        int totPeeks = numBefore + numMiddle + numAfter;
-        Condition<Long> tsfc = t.getTester().tupleCount(sf, 2);
-        Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
-        Condition<List<Peeked>> tsf = t.getTester().streamContents(sf, new Peeked(33, totPeeks), new Peeked(9234, totPeeks));
-        Condition<List<Peeked>> tsm = t.getTester().streamContents(sm, new Peeked(70, totPeeks), new Peeked(-177, totPeeks),
-                new Peeked(9271, totPeeks));
-
-        complete(t, t.getTester().and(tsfc, tsmc));
-
-        assertTrue(tsf.getResult().toString(), tsf.valid());
-        assertTrue(tsm.getResult().toString(), tsm.valid());
-    }
-
-    public static class Peeked implements Serializable {
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + peekedCnt;
-            result = prime * result + value;
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (obj == null)
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            Peeked other = (Peeked) obj;
-            if (peekedCnt != other.peekedCnt)
-                return false;
-            if (value != other.value)
-                return false;
-            return true;
-        }
-
-        private static final long serialVersionUID = 1L;
-        final int value;
-        int peekedCnt;
-
-        Peeked(int value) {
-            this.value = value;
-        }
-
-        Peeked(int value, boolean peeked) {
-          this(value, 1);
-        }
-
-        Peeked(int value, int peekedCnt) {
-          this.value = value;
-          // this.peeked = true;
-          this.peekedCnt = peekedCnt;
-        }
-        
-        public String toString() {
-          return "{" + "value=" + value + " peekedCnt=" + peekedCnt + "}";
-        }
-    }
-    
-    /**
-     * Test Union with itself.
-     * 
-     * @throws Exception
-     */
-    @Test
-    public void testUnionWithSelf() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("a", "b", "c");
-
-        assertSame(s, s.union(s));
-        assertSame(s, s.union(Collections.emptySet()));
-        assertSame(s, s.union(Collections.singleton(s)));
-    }
-    
-    @Test
-    public void testUnion2() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s1 = t.strings("a", "b", "c");
-        TStream<String> s2 = t.strings("d", "e");
-        TStream<String> su = s1.union(s2);
-        assertNotSame(s1, su);
-        assertNotSame(s2, su);
-        TStream<String> r = su.modify(v -> v.concat("X"));
-
-        Condition<Long> tc = t.getTester().tupleCount(r, 5);
-        Condition<List<String>> contents = t.getTester().contentsUnordered(r,
-                "aX", "bX", "cX", "dX", "eX");
-        complete(t, tc);
-
-        assertTrue(tc.getResult().toString(), tc.valid());
-        assertTrue(contents.getResult().toString(), contents.valid());
-    }
-    
-    @Test
-    public void testUnion4() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s1 = t.strings("a", "b", "c");
-        TStream<String> s2 = t.strings("d", "e");
-        TStream<String> s3 = t.strings("f", "g", "h", "i");
-        TStream<String> s4 = t.strings("j");
-        TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s2, s3, s4)));
-        assertNotSame(s1, su);
-        assertNotSame(s2, su);
-        assertNotSame(s3, su);
-        assertNotSame(s4, su);
-        TStream<String> r = su.modify(v -> v.concat("Y"));
-
-        Condition<Long> tc = t.getTester().tupleCount(r, 10);
-        Condition<List<String>> contents = t.getTester().contentsUnordered(r,
-                "aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
-        complete(t, tc);
-
-        assertTrue(tc.getResult().toString(), tc.valid());
-        assertTrue(contents.getResult().toString(), contents.valid());
-    }
-    
-    @Test
-    public void testUnion4WithSelf() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s1 = t.strings("a", "b", "c");
-        TStream<String> s2 = t.strings("d", "e");
-        TStream<String> s3 = t.strings("f", "g", "h", "i");
-        TStream<String> s4 = t.strings("j");
-        TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s1, s2, s3, s4)));
-        assertNotSame(s1, su);
-        assertNotSame(s2, su);
-        assertNotSame(s3, su);
-        assertNotSame(s4, su);
-        TStream<String> r = su.modify(v -> v.concat("Y"));
-
-        Condition<Long> tc = t.getTester().tupleCount(r, 10);
-        Condition<List<String>> contents = t.getTester().contentsUnordered(r,
-                "aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
-        complete(t, tc);
-
-        assertTrue(tc.getResult().toString(), tc.valid());
-        assertTrue(contents.getResult().toString(), contents.valid());
-    }
-    
-    @Test
-    public void testSink() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("a", "b", "c");
-        
-        List<String> sinked = new ArrayList<>();
-        TSink<String> terminal = s.sink(tuple -> sinked.add(tuple));
-        assertNotNull(terminal);
-        assertSame(t, terminal.topology());
-        assertSame(s, terminal.getFeed());
-        TStream<String> s1 = s.filter(tuple -> true);
-
-        Condition<Long> tc = t.getTester().tupleCount(s1, 3);
-        complete(t, tc);
-        
-        assertEquals("a", sinked.get(0));
-        assertEquals("b", sinked.get(1));
-        assertEquals("c", sinked.get(2));
-    }
-    
-    /**
-     * Submit multiple jobs concurrently using ProcessSource.
-     * @throws Exception on failure
-     */
-    @Test
-    public void testMultiTopology() throws Exception {
-
-        int executions = 4;
-        ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
-                Executors.newFixedThreadPool(executions));
-        for (int i = 0; i < executions; i++) {
-            completer.submit(() -> {
-                Topology t = newTopology();
-                TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h");
-                s.sink((tuple) -> { if ("h".equals(tuple)) System.out.println(tuple);});
-                Condition<Long> tc = t.getTester().tupleCount(s, 8);
-                complete(t, tc);
-                return true;
-            });
-        }
-        waitForCompletion(completer, executions);
-    }
-
-    /**
-     * Submit multiple jobs concurrently using ProcessSource.
-     * @throws Exception on failure
-     */
-    @Test
-    public void testMultiTopologyWithError() throws Exception {
-
-        int executions = 4;
-        ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
-                Executors.newFixedThreadPool(executions));
-        for (int i = 0; i < executions; i++) {
-            completer.submit(() -> {
-                Topology t = newTopology();
-                TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h");
-                // Throw on the 8th tuple
-                s.sink((tuple) -> { if ("h".equals(tuple)) throw new RuntimeException("Expected Test Exception");});
-                // Expect 7 tuples out of 8
-                Condition<Long> tc = t.getTester().tupleCount(s, 7);
-                complete(t, tc);
-                return true;
-            });
-        }
-        waitForCompletion(completer, executions);
-    }
-    
-    /**
-     * Submit multiple jobs concurrently using PeriodicSource.
-     * @throws Exception on failure
-     */
-    @Test
-    public void testMultiTopologyPollWithError() throws Exception {
-
-        int executions = 4;
-        ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
-                Executors.newFixedThreadPool(executions));
-        for (int i = 0; i < executions; i++) {
-            completer.submit(() -> {
-                Topology t = newTopology();
-                AtomicLong n = new AtomicLong(0);
-                TStream<Long> s = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
-                // Throw on the 8th tuple
-                s.sink((tuple) -> { if (8 == n.get()) throw new RuntimeException("Expected Test Exception");});
-                // Expect 7 tuples out of 8
-                Condition<Long> tc = t.getTester().tupleCount(s, 7);
-                complete(t, tc);
-                return true;
-            });
-        }
-        waitForCompletion(completer, executions);
-    }
-    
-    @Test
-    public void testJoinWithWindow() throws Exception{
-        Topology t = newTopology();
-        
-        List<Integer> ints = new ArrayList<>();
-        List<Integer> lookupInts = new ArrayList<>();
-        
-        // Ints to populate the window
-        for(int i = 0; i < 100; i++){
-            ints.add(i);
-        }
-        
-        // Ints to lookup partitions in window
-        for(int i = 0; i < 10; i++){
-            lookupInts.add(i);
-        }
-        TStream<Integer> intStream = t.collection(ints);
-        
-        // Wait until the window is populated, and then submit tuples
-        TStream<Integer> lookupIntStream = t.source(() -> {
-            try {
-                Thread.sleep(500);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-            return lookupInts;
-        });
-        
-        TWindow<Integer, Integer> window = intStream.last(10, tuple -> tuple % 10);
-        TStream<Integer> joinsHappened = lookupIntStream.join(tuple -> tuple % 10, window, (number, partitionContents) -> {
-            assertTrue(partitionContents.size() == 10);
-            for(Integer element : partitionContents)
-                assertTrue(number % 10 == element % 10);
-            
-            // Causes an error if two numbers map to the same partition, which shouldn't happen
-            partitionContents.clear();
-            return 0;
-        });
-    
-        Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 10);
-        complete(t, tc);      
-    }
-    
-    @Test
-    public void testJoinLastWithKeyer() throws Exception{
-        Topology t = newTopology();
-        
-        List<Integer> ints = new ArrayList<>();
-        for(int i = 0; i < 100; i++){
-            ints.add(i);
-        }
-        
-        TStream<Integer> intStream = t.collection(ints);
-        
-        // Wait until the window is populated, and then submit tuples
-        TStream<Integer> lookupIntStream = t.source(() -> {
-            try {
-                Thread.sleep(500);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-            return ints;
-        });
-        
-        TStream<String> joinsHappened = lookupIntStream.joinLast(tuple -> tuple, intStream, tuple -> tuple, (a, b) -> {
-            assertTrue(a.equals(b));
-            return "0";
-        });
-
-        Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 100);
-        complete(t, tc);      
-    }
-
-    private void waitForCompletion(ExecutorCompletionService<Boolean> completer, int numtasks) throws ExecutionException {
-        int remainingTasks = numtasks;
-        while (remainingTasks > 0) {
-            try {
-                Future<Boolean> completed = completer.poll(4, TimeUnit.SECONDS);
-                if (completed == null) {
-                    System.err.println("Completer timed out");
-                    throw new RuntimeException(new TimeoutException());
-                }
-                else {
-                    completed.get();
-                }
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-            remainingTasks--;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TWindowTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TWindowTest.java b/api/topology/src/test/java/edgent/test/topology/TWindowTest.java
deleted file mode 100644
index 7d522d9..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TWindowTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static edgent.function.Functions.identity;
-import static edgent.function.Functions.unpartitioned;
-import static edgent.function.Functions.zero;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import edgent.topology.TStream;
-import edgent.topology.TWindow;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-@Ignore
-public abstract class TWindowTest extends TopologyAbstractTest{
-    @Test
-    public void testCountBasedBatch() throws Exception{
-        Topology top = newTopology();
-        List<Integer> intList = new ArrayList<>();
-        for(int i = 0; i < 1000;i++)
-            intList.add(i);
-        TStream<Integer> ints = top.source(() -> intList);
-        
-        TWindow<Integer, Integer> window = ints.last(100, tuple -> 0);
-        TStream<Integer> sizes = window.batch((tuples, key) -> {
-            return tuples.size();
-        });
-        Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
-                100,100,100,100,100,100,100,100,100,100);
-        complete(top, contents);
-        assertTrue(contents.valid());
-    }
-    
-    @Test
-    public void testTimeBasedBatch() throws Exception{
-        Topology top = newTopology();
-        TStream<Integer> ints = top.poll(() -> {
-            return 1;
-        }, 10, TimeUnit.MILLISECONDS);
-        
-        TWindow<Integer, Integer> window = ints.last(1000, TimeUnit.MILLISECONDS, tuple -> 0);
-        TStream<Integer> sizes = window.batch((tuples, key) -> {
-            return tuples.size();
-        });
-
-        Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
-           100, 100, 100, 100, 100, 100, 100, 100, 100, 100);
-        complete(top, contents);
-        System.out.println(contents.getResult());
-        for(Integer size : contents.getResult()){
-            assertTrue(size >= 90 && size <= 110);
-        }
-    }
-    
-    @Test
-    public void testKeyedWindowSum() throws Exception {
-        Topology t = newTopology();
-        
-        TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4,4,3,4,4,3));
-        TWindow<Integer, Integer> window = integers.last(9, identity());
-        assertSame(identity(), window.getKeyFunction());
-        assertSame(t, window.topology());
-        assertSame(integers, window.feeder());
-
-        TStream<Integer> sums = window.aggregate((tuples, key) -> {
-            // All tuples in a partition are equal due to identity
-            assertEquals(1, new HashSet<>(tuples).size());
-            int sum = 0;
-            for(Integer tuple : tuples)
-                sum+=tuple;
-            return sum;
-        });
-        
-        Condition<Long> tc = t.getTester().tupleCount(sums, 9);
-        Condition<List<Integer>> contents = t.getTester().streamContents(sums, 
-                1, 2, 3, 4, 8, 6, 12, 16, 9);
-        complete(t, tc);
-
-        assertTrue(contents.valid());
-    }
-    
-    @Test
-    public void testWindowSum() throws Exception {
-        Topology t = newTopology();
-        
-        TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4));
-        TWindow<Integer, Integer> window = integers.last(4, unpartitioned());
-        assertSame(unpartitioned(), window.getKeyFunction());
-        TStream<Integer> sums = window.aggregate((tuples, key) -> {
-            assertEquals(Integer.valueOf(0), key);
-            int sum = 0;
-            for(Integer tuple : tuples)
-                sum+=tuple;
-            return sum;
-        });
-
-        Condition<Long> tc = t.getTester().tupleCount(sums, 4);
-        Condition<List<Integer>> contents = t.getTester().streamContents(sums, 1, 3, 6, 10);
-        complete(t, tc);
-
-        assertTrue(contents.valid());
-    }
-    
-    @Test
-    public void testTimeWindowTimeDiff() throws Exception {
-		// Timing variances on shared machines can cause this test to fail
-    	assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-    	
-        Topology t = newTopology();
-        
-        // Define data
-        ConcurrentLinkedQueue<Long> diffs = new ConcurrentLinkedQueue<>();
-        
-        // Poll data
-        TStream<Long> times = t.poll(() -> {   
-            return System.currentTimeMillis();
-        }, 1, TimeUnit.MILLISECONDS);
-
-        TWindow<Long, Integer> window = times.last(1, TimeUnit.SECONDS, unpartitioned());
-        assertSame(zero(), window.getKeyFunction());
-        TStream<Long> diffStream = window.aggregate((tuples, key) -> {
-            assertEquals(Integer.valueOf(0), key);
-            if(tuples.size() < 2){
-                return null;
-            }
-            return tuples.get(tuples.size() -1) - tuples.get(0);
-        });
-        
-        diffStream.sink((tuple) -> diffs.add(tuple));
-        
-        Condition<Long> tc = t.getTester().tupleCount(diffStream, 5000);
-        complete(t, tc);
-        
-        for(Long diff : diffs){
-            assertTrue("Diff is: " + diff, diff >=0 && diff < 1060);
-        }
-        
-    }
-    
-    public static boolean withinTolerance(double expected, Double actual, double tolerance) {
-        double lowBound = (1.0 - tolerance) * expected;
-        double highBound = (1.0 + tolerance) * expected;
-        return (actual < highBound && actual > lowBound);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TopologyAbstractTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TopologyAbstractTest.java b/api/topology/src/test/java/edgent/test/topology/TopologyAbstractTest.java
deleted file mode 100644
index 5bb2248..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TopologyAbstractTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Before;
-import org.junit.Ignore;
-
-import com.google.gson.JsonObject;
-
-import edgent.execution.Job;
-import edgent.execution.Submitter;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.TopologyProvider;
-import edgent.topology.tester.Condition;
-
-@Ignore("Abstract class proiding generic topology testing.")
-public abstract class TopologyAbstractTest implements TopologyTestSetup {
-
-    private TopologyProvider topologyProvider;
-    private Submitter<Topology, Job> submitter;
-
-    @Before
-    public void setup() {
-        topologyProvider = createTopologyProvider();
-        assertNotNull(topologyProvider);
-
-        submitter = createSubmitter();
-        assertNotNull(submitter);
-    }
-
-    protected Submitter<Topology, ?> getSubmitter() {
-        return submitter;
-    }
-
-    @Override
-    public TopologyProvider getTopologyProvider() {
-        return topologyProvider;
-    }
-
-    protected Topology newTopology() {
-        return getTopologyProvider().newTopology();
-    }
-
-    protected Topology newTopology(String name) {
-        return getTopologyProvider().newTopology(name);
-    }
-
-    protected boolean complete(Topology topology, Condition<?> endCondition, long timeout, TimeUnit units) throws Exception {
-        return topology.getTester().complete(getSubmitter(), new JsonObject(), endCondition, timeout, units);
-    }
-
-    protected boolean complete(Topology topology, Condition<?> endCondition) throws Exception {
-        return complete(topology, endCondition, 10, TimeUnit.SECONDS);
-    }
-    
-    public void completeAndValidate(String msg, Topology t,
-            TStream<String> s, int secTimeout, String... expected)
-            throws Exception {
-        completeAndValidate(true/*ordered*/, msg, t, s, secTimeout, expected);
-    }
-    
-    public void completeAndValidate(boolean ordered, String msg, Topology t,
-            TStream<String> s, int secTimeout, String... expected)
-            throws Exception {
-
-        // if expected.length==0 we must run until the job completes or tmo
-        Condition<Long> tc = t.getTester().tupleCount(s, 
-                expected.length == 0 ? Long.MAX_VALUE : expected.length);
-        Condition<List<String>> contents = 
-                ordered ? t.getTester().streamContents(s, expected)
-                        : t.getTester().contentsUnordered(s, expected);
-
-        complete(t, tc, secTimeout, TimeUnit.SECONDS);
-
-        assertTrue(msg + " contents:" + contents.getResult(), contents.valid());
-        if (expected.length != 0)
-            assertTrue("valid:" + tc.getResult(), tc.valid());
-    }
-
-    protected void assertStream(Topology t, TStream<?> s) {
-        assertSame(t, s.topology());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TopologyElementTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TopologyElementTest.java b/api/topology/src/test/java/edgent/test/topology/TopologyElementTest.java
deleted file mode 100644
index 9deba8b..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TopologyElementTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.TopologyElement;
-
-public class TopologyElementTest {
-
-    @Test
-    public void testHierachy() {
-        assertTrue(TopologyElement.class.isAssignableFrom(Topology.class));
-        assertTrue(TopologyElement.class.isAssignableFrom(TStream.class));
-        assertTrue(TopologyElement.class.isAssignableFrom(TSink.class));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TopologyTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TopologyTest.java b/api/topology/src/test/java/edgent/test/topology/TopologyTest.java
deleted file mode 100644
index 1690a11..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TopologyTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import edgent.execution.Job;
-import edgent.execution.mbeans.PeriodMXBean;
-import edgent.execution.services.ControlService;
-import edgent.execution.services.RuntimeServices;
-import edgent.function.Supplier;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-@Ignore("abstract, provides common tests for concrete implementations")
-public abstract class TopologyTest extends TopologyAbstractTest {
-
-    @Test
-    public void testBasics() {
-        final Topology t = newTopology("T123");
-        assertEquals("T123", t.getName());
-        assertSame(t, t.topology());
-    }
-
-    @Test
-    public void testDefaultName() {
-        final Topology t = newTopology();
-        assertSame(t, t.topology());
-        assertNotNull(t.getName());
-    }
-
-    @Test
-    public void testStringContants() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings("a", "b", "c");
-        assertStream(t, s);
-
-        Condition<Long> tc = t.getTester().tupleCount(s, 3);
-        Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
-        complete(t, tc);
-        assertTrue(contents.valid());
-    }
-
-    @Test
-    public void testNoStringContants() throws Exception {
-
-        Topology t = newTopology();
-
-        TStream<String> s = t.strings();
-
-        Condition<Long> tc = t.getTester().tupleCount(s, 0);
-
-        complete(t, tc);
-        
-        assertTrue(tc.valid());
-    }
-    
-    @Test
-    public void testRuntimeServices() throws Exception {
-        Topology t = newTopology();
-        TStream<String> s = t.strings("A");
-        
-        Supplier<RuntimeServices> serviceGetter =
-                t.getRuntimeServiceSupplier();
-        
-        TStream<Boolean> b = s.map(tuple -> 
-            serviceGetter.get().getService(ThreadFactory.class) != null
-            && serviceGetter.get().getService(ScheduledExecutorService.class) != null
-        );
-        
-        Condition<List<Boolean>> tc = t.getTester().streamContents(b, Boolean.TRUE);
-        complete(t, tc);
-        
-        assertTrue(tc.valid());
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testAdaptablePoll() throws Exception {
-        // Ensure the API supporting an adaptable poll() is functional.
-        Job job = null;
-        try {
-            Topology t = newTopology();
-            TStream<String> s = t.poll(() -> (new Date()).toString(),
-                    1, TimeUnit.HOURS)
-                    .alias("myAlias");
-            
-            AtomicInteger cnt = new AtomicInteger();
-            s.peek(tuple -> cnt.incrementAndGet()); 
-            
-            Future<Job> jf = (Future<Job>) getSubmitter().submit(t);
-            job = jf.get();
-            assertEquals(Job.State.RUNNING, job.getCurrentState());
-            
-            setPollFrequency(s, 100, TimeUnit.MILLISECONDS);
-            cnt.set(0);
-            Thread.sleep(TimeUnit.SECONDS.toMillis(3));
-            int curCnt = cnt.get();
-            assertTrue("curCnt="+curCnt, curCnt >= 20);
-            
-            setPollFrequency(s, 1, TimeUnit.SECONDS);
-            cnt.set(0);
-            Thread.sleep(TimeUnit.SECONDS.toMillis(3));
-            curCnt = cnt.get();
-            assertTrue("curCnt="+curCnt, curCnt >= 2 && curCnt <= 4);
-            
-            setPollFrequency(s, 100, TimeUnit.MILLISECONDS);
-            cnt.set(0);
-            Thread.sleep(TimeUnit.SECONDS.toMillis(3));
-            curCnt = cnt.get();
-            assertTrue("curCnt="+curCnt, curCnt >= 20);
-        }
-        finally {
-            if (job != null)
-                job.stateChange(Job.Action.CLOSE);
-        }
-        
-    }
-
-    static <T> void setPollFrequency(TStream<T> pollStream, long period, TimeUnit unit) {
-        ControlService cs = pollStream.topology().getRuntimeServiceSupplier()
-                                    .get().getService(ControlService.class);
-        PeriodMXBean control = cs.getControl(TStream.TYPE,
-                                  pollStream.getAlias(), PeriodMXBean.class);
-        control.setPeriod(period, unit);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TopologyTestSetup.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TopologyTestSetup.java b/api/topology/src/test/java/edgent/test/topology/TopologyTestSetup.java
deleted file mode 100644
index 971d936..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TopologyTestSetup.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import edgent.execution.Job;
-import edgent.execution.Submitter;
-import edgent.topology.Topology;
-import edgent.topology.TopologyProvider;
-
-public interface TopologyTestSetup {
-
-    TopologyProvider createTopologyProvider();
-
-    TopologyProvider getTopologyProvider();
-
-    Submitter<Topology, Job> createSubmitter();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/services/TestApplications.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/services/TestApplications.java b/api/topology/src/test/java/edgent/test/topology/services/TestApplications.java
deleted file mode 100644
index 4972c51..0000000
--- a/api/topology/src/test/java/edgent/test/topology/services/TestApplications.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology.services;
-
-import com.google.gson.JsonObject;
-
-import edgent.function.BiConsumer;
-import edgent.topology.Topology;
-import edgent.topology.services.TopologyBuilder;
-
-/**
- * Some dummy test applications that will be part of this
- * jar including having service provider entries.
- * Used for testing the application service.
- *
- */
-public class TestApplications {
-        
-    private static abstract class App implements TopologyBuilder {
-
-        @Override
-        public BiConsumer<Topology, JsonObject> getBuilder() {
-            return (t,c) -> t.strings(getName()).print();
-        }     
-    }
-    
-    public static class AppOne extends App {
-        @Override
-        public String getName() {
-            return "FirstJarApp";
-        }
-    }
-    public static class AppTwo extends App {
-        @Override
-        public String getName() {
-            return "SecondJarApp";
-        }
-    }
-    public static class AppThree extends App {
-        @Override
-        public String getName() {
-            return "ThirdJarApp";
-        }
-    }
-}