You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:39 UTC
[38/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/PlumbingTest.java b/api/topology/src/test/java/edgent/test/topology/PlumbingTest.java
deleted file mode 100644
index cf03668..0000000
--- a/api/topology/src/test/java/edgent/test/topology/PlumbingTest.java
+++ /dev/null
@@ -1,747 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
-import edgent.function.BiFunction;
-import edgent.function.Function;
-import edgent.function.Functions;
-import edgent.function.ToIntFunction;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-import edgent.topology.plumbing.Valve;
-import edgent.topology.tester.Condition;
-
-@Ignore
-public abstract class PlumbingTest extends TopologyAbstractTest {
-
-
- @Test
- public void testBlockingDelay() throws Exception {
- // Timing variances on shared machines can cause this test to fail
- assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
- Topology topology = newTopology();
-
- TStream<String> strings = topology.strings("a", "b", "c", "d");
-
- TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
-
- // delay stream
- starts = PlumbingStreams.blockingDelay(starts, 300, TimeUnit.MILLISECONDS);
-
- // calculate display
- starts = starts.modify(v -> System.currentTimeMillis() - v);
-
- starts = starts.filter(v -> v >= 300);
-
- Condition<Long> tc = topology.getTester().tupleCount(starts, 4);
- complete(topology, tc);
- assertTrue("valid:" + tc.getResult(), tc.valid());
- }
-
- @Test
- public void testBlockingThrottle() throws Exception {
- // Timing variances on shared machines can cause this test to fail
- assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
- Topology topology = newTopology();
-
- TStream<String> strings = topology.strings("a", "b", "c", "d");
-
- TStream<Long> emittedDelays = strings.map(v -> 0L);
-
- // throttle stream
- long[] lastEmittedTimestamp = { 0 };
- emittedDelays = PlumbingStreams.blockingThrottle(emittedDelays, 300, TimeUnit.MILLISECONDS)
- .map(t -> {
- // compute the delay since the last emitted tuple
- long now = System.currentTimeMillis();
- if (lastEmittedTimestamp[0] == 0)
- lastEmittedTimestamp[0] = now;
- t = now - lastEmittedTimestamp[0];
- lastEmittedTimestamp[0] = now;
- // System.out.println("### "+t);
- return t;
- })
- .map(t -> {
- // simulate 200ms downstream processing delay
- try {
- Thread.sleep(TimeUnit.MILLISECONDS.toMillis(200));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } return t;
- }) ;
-
- // should end up with throttled delays close to 300 (not 500 like
- // a blockingDelay() under these same conditions would yield)
- emittedDelays = emittedDelays.filter(v -> v <= 320);
-
- Condition<Long> tc = topology.getTester().tupleCount(emittedDelays, 4);
- complete(topology, tc);
- assertTrue("valid:" + tc.getResult(), tc.valid());
- }
-
- @Test
- public void testOneShotDelay() throws Exception {
-
- Topology topology = newTopology();
-
- TStream<String> strings = topology.strings("a", "b", "c", "d");
-
- TStream<Long> starts = strings.map(v -> System.currentTimeMillis());
-
- // delay stream
- starts = PlumbingStreams.blockingOneShotDelay(starts, 300, TimeUnit.MILLISECONDS);
-
- // calculate display
- starts = starts.modify(v -> System.currentTimeMillis() - v);
-
- // the first tuple shouldn't satisfy the predicate
- starts = starts.filter(v -> v < 300);
-
- Condition<Long> tc = topology.getTester().tupleCount(starts, 3);
- complete(topology, tc);
- assertTrue("valid:" + tc.getResult(), tc.valid());
- }
-
- public static class TimeAndId {
- private static AtomicInteger ids = new AtomicInteger();
- long ms;
- final int id;
-
- public TimeAndId() {
- this.ms = System.currentTimeMillis();
- this.id = ids.incrementAndGet();
- }
- public TimeAndId(TimeAndId tai) {
- this.ms = System.currentTimeMillis() - tai.ms;
- this.id = tai.id;
- }
- @Override
- public String toString() {
- return "TAI:" + id + "@" + ms;
- }
-
- }
-
- @Test
- public void testPressureReliever() throws Exception {
- // Timing variances on shared machines can cause this test to fail
- assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
- Topology topology = newTopology();
-
- TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);
-
-
- TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 5);
-
- // insert a blocking delay acting as downstream operator that cannot keep up
- TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS);
-
- // calculate the delay
- TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));
-
- // Also process raw that should be unaffected by the slow path
- TStream<String> processed = raw.asString();
-
-
- Condition<Long> tcSlowCount = topology.getTester().atLeastTupleCount(slow, 20);
- Condition<List<TimeAndId>> tcRaw = topology.getTester().streamContents(raw);
- Condition<List<TimeAndId>> tcSlow = topology.getTester().streamContents(slow);
- Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM);
- Condition<List<String>> tcProcessed = topology.getTester().streamContents(processed);
- complete(topology, tcSlowCount);
-
- assertTrue(tcProcessed.getResult().size() > tcSlowM.getResult().size());
- for (TimeAndId delay : tcSlowM.getResult())
- assertTrue(delay.ms < 300);
-
- // Must not lose any tuples in the non relieving path
- Set<TimeAndId> uniq = new HashSet<>(tcRaw.getResult());
- assertEquals(tcRaw.getResult().size(), uniq.size());
-
- // Must not lose any tuples in the non relieving path
- Set<String> uniqProcessed = new HashSet<>(tcProcessed.getResult());
- assertEquals(tcProcessed.getResult().size(), uniqProcessed.size());
-
- assertEquals(uniq.size(), uniqProcessed.size());
-
- // Might lose tuples, but must not have send duplicates
- uniq = new HashSet<>(tcSlow.getResult());
- assertEquals(tcSlow.getResult().size(), uniq.size());
- }
-
- @Test
- public void testPressureRelieverWithInitialDelay() throws Exception {
-
- Topology topology = newTopology();
-
-
- TStream<String> raw = topology.strings("A", "B", "C", "D", "E", "F", "G", "H");
-
- TStream<String> pr = PlumbingStreams.pressureReliever(raw, v -> 0, 100);
-
- TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 5, TimeUnit.SECONDS);
-
- Condition<Long> tcCount = topology.getTester().tupleCount(pr2, 8);
- Condition<List<String>> contents = topology.getTester().streamContents(pr2, "A", "B", "C", "D", "E", "F", "G", "H");
- complete(topology, tcCount);
-
- assertTrue(tcCount.valid());
- assertTrue(contents.valid());
- }
-
- @Test
- public void testValveState() throws Exception {
- Valve<Integer> valve = new Valve<>();
- assertTrue(valve.isOpen());
-
- valve = new Valve<>(true);
- assertTrue(valve.isOpen());
-
- valve = new Valve<>(false);
- assertFalse(valve.isOpen());
-
- valve.setOpen(true);
- assertTrue(valve.isOpen());
-
- valve.setOpen(false);
- assertFalse(valve.isOpen());
- }
-
- @Test
- public void testValveInitiallyOpen() throws Exception {
- Topology top = newTopology("testValve");
-
- TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-
- Valve<Integer> valve = new Valve<>();
- AtomicInteger cnt = new AtomicInteger();
- TStream<Integer> filtered = values
- .peek(tuple -> {
- // reject 4,5,6
- int curCnt = cnt.incrementAndGet();
- if (curCnt > 6)
- valve.setOpen(true);
- else if (curCnt > 3)
- valve.setOpen(false);
- })
- .filter(valve);
-
- Condition<Long> count = top.getTester().tupleCount(filtered, 7);
- Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 1,2,3,7,8,9,10 );
- complete(top, count);
- assertTrue(contents.getResult().toString(), contents.valid());
- }
-
- @Test
- public void testValveInitiallyClosed() throws Exception {
- Topology top = newTopology("testValve");
-
- TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-
- Valve<Integer> valve = new Valve<>(false);
-
- AtomicInteger cnt = new AtomicInteger();
- TStream<Integer> filtered = values
- .peek(tuple -> {
- // reject all but 4,5,6
- int curCnt = cnt.incrementAndGet();
- if (curCnt > 6)
- valve.setOpen(false);
- else if (curCnt > 3)
- valve.setOpen(true);
- })
- .filter(valve);
-
- Condition<Long> count = top.getTester().tupleCount(filtered, 3);
- Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 4,5,6 );
- complete(top, count);
- assertTrue(contents.getResult().toString(), contents.valid());
- }
-
- private Function<Integer,JsonObject> fakeAnalytic(int channel, long period, TimeUnit unit) {
- return value -> {
- try {
- Thread.sleep(unit.toMillis(period));
- JsonObject jo = new JsonObject();
- jo.addProperty("channel", channel);
- jo.addProperty("result", value);
- return jo;
- } catch (InterruptedException e) {
- throw new RuntimeException("channel="+channel+" interrupted", e);
- }
- };
- }
-
- private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
- return stream -> stream.map(fakeAnalytic(channel, period, unit)).filter(t->true).tag("pipeline-ch"+channel);
- }
-
- @Test
- public void testConcurrentMap() throws Exception {
- Topology top = newTopology("testConcurrentMap");
-
- int ch = 0;
- List<Function<Integer,JsonObject>> mappers = new ArrayList<>();
- mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
- mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
- mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
- mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
- mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
- mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
- // a couple much faster just in case something's amiss with queues
- mappers.add(fakeAnalytic(ch++, 3, TimeUnit.MILLISECONDS));
- mappers.add(fakeAnalytic(ch++, 13, TimeUnit.MILLISECONDS));
-
- Function<List<JsonObject>,Integer> combiner = list -> {
- int sum = 0;
- int cnt = 0;
- System.out.println("combiner: "+list);
- for(JsonObject jo : list) {
- assertEquals(cnt++, jo.get("channel").getAsInt());
- sum += jo.get("result").getAsInt();
- }
- return sum;
- };
-
- TStream<Integer> values = top.of(1, 2, 3);
- Integer[] resultTuples = new Integer[]{
- 1*mappers.size(),
- 2*mappers.size(),
- 3*mappers.size(),
- };
-
- TStream<Integer> result = PlumbingStreams.concurrentMap(values, mappers, combiner);
-
- Condition<Long> count = top.getTester().tupleCount(result, 3);
- Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
-
- long begin = System.currentTimeMillis();
- complete(top, count);
- long end = System.currentTimeMillis();
-
- assertTrue(contents.getResult().toString(), contents.valid());
-
- long actDuration = end - begin;
- long expMinSerialDuration = resultTuples.length * mappers.size() * 100;
- long expMinDuration = resultTuples.length * 100;
-
- System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-
- // a gross level performance check w/concurrent channels
- if (Boolean.getBoolean("edgent.build.ci"))
- System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
- else
- assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
- actDuration < 0.5 * expMinSerialDuration);
- }
-
- @Test
- public void testConcurrent() throws Exception {
- Topology top = newTopology("testConcurrent");
-
- int ch = 0;
- List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>();
- pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
- pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
- pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
- pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
- pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
-
- Function<List<JsonObject>,Integer> combiner = list -> {
- int sum = 0;
- int cnt = 0;
- System.out.println("combiner: "+list);
- for(JsonObject jo : list) {
- assertEquals(cnt++, jo.get("channel").getAsInt());
- sum += jo.get("result").getAsInt();
- }
- return sum;
- };
-
- TStream<Integer> values = top.of(1, 2, 3);
- Integer[] resultTuples = new Integer[]{
- 1*pipelines.size(),
- 2*pipelines.size(),
- 3*pipelines.size(),
- };
-
- TStream<Integer> result = PlumbingStreams.concurrent(values, pipelines, combiner).tag("result");
-
- Condition<Long> count = top.getTester().tupleCount(result, 3);
- Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
-
- long begin = System.currentTimeMillis();
- complete(top, count);
- long end = System.currentTimeMillis();
-
- assertTrue(contents.getResult().toString(), contents.valid());
-
- long actDuration = end - begin;
- long expMinSerialDuration = resultTuples.length * pipelines.size() * 100;
- long expMinDuration = resultTuples.length * 100;
-
- System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-
- // a gross level performance check w/concurrent channels
- if (Boolean.getBoolean("edgent.build.ci"))
- System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
- else
- assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
- actDuration < 0.5 * expMinSerialDuration);
- }
-
- private BiFunction<Integer,Integer,JsonObject> fakeParallelAnalytic(long period, TimeUnit unit) {
- return (value,channel) -> {
- try {
- Thread.sleep(unit.toMillis(period)); // simulate work for this period
- JsonObject jo = new JsonObject();
- jo.addProperty("channel", channel);
- jo.addProperty("result", value);
- return jo;
- } catch (InterruptedException e) {
- throw new RuntimeException("channel="+channel+" interrupted", e);
- }
- };
- }
-
- private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
- return (stream,channel) -> stream
- .map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))
- .filter(t->true)
- .tag("pipeline-ch"+channel);
- }
-
- private Function<JsonObject,JsonObject> fakeJsonAnalytic(int channel, long period, TimeUnit unit) {
- return jo -> {
- try {
- Thread.sleep(unit.toMillis(period)); // simulate work for this period
- return jo;
- } catch (InterruptedException e) {
- throw new RuntimeException("channel="+channel+" interrupted", e);
- }
- };
- }
-
- @SuppressWarnings("unused")
- private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
- return (stream,channel) -> stream
- .map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
- return jo; })
- .map(fakeJsonAnalytic(channel, period, unit))
- .filter(t->true)
- .map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
- return jo; })
- .tag("pipeline-ch"+channel);
- }
-
- @Test
- public void testParallelMap() throws Exception {
- Topology top = newTopology("testParallelMap");
-
- BiFunction<Integer,Integer,JsonObject> mapper =
- fakeParallelAnalytic(100, TimeUnit.MILLISECONDS);
-
- int width = 5;
- ToIntFunction<Integer> splitter = tuple -> tuple % width;
-
- Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
- TStream<Integer> values = top.of(resultTuples);
-
- TStream<JsonObject> result = PlumbingStreams.parallelMap(values, width, splitter, mapper).tag("result");
- TStream<Integer> result2 = result.map(jo -> {
- int r = jo.get("result").getAsInt();
- assertEquals(splitter.applyAsInt(r), jo.get("channel").getAsInt());
- return r;
- });
-
- Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
- Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
-
- long begin = System.currentTimeMillis();
- complete(top, count);
- long end = System.currentTimeMillis();
-
- assertTrue(contents.getResult().toString(), contents.valid());
-
- long actDuration = end - begin;
- long expMinSerialDuration = resultTuples.length * 100;
- long expMinDuration = (resultTuples.length / width) * 100;
-
- System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-
- // a gross level performance check w/parallel channels
- if (Boolean.getBoolean("edgent.build.ci"))
- System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
- else
- assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
- actDuration < 0.5 * expMinSerialDuration);
- }
-
- @Test
- public void testParallel() throws Exception {
- Topology top = newTopology("testParallel");
-
- BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline =
- fakeParallelPipeline(100, TimeUnit.MILLISECONDS);
-
- int width = 5;
- ToIntFunction<Integer> splitter = tuple -> tuple % width;
-
- Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
- TStream<Integer> values = top.of(resultTuples);
-
- TStream<JsonObject> result = PlumbingStreams.parallel(values, width, splitter, pipeline).tag("result");
- TStream<Integer> result2 = result.map(jo -> {
- int r = jo.get("result").getAsInt();
- assertEquals(splitter.applyAsInt(r), jo.get("channel").getAsInt());
- return r;
- });
-
- Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
- Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
-
- long begin = System.currentTimeMillis();
- complete(top, count);
- long end = System.currentTimeMillis();
-
- assertTrue(contents.getResult().toString(), contents.valid());
-
- long actDuration = end - begin;
- long expMinSerialDuration = resultTuples.length * 100;
- long expMinDuration = (resultTuples.length / width) * 100;
-
- System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-
- // a gross level performance check w/parallel channels
- if (Boolean.getBoolean("edgent.build.ci"))
- System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
- else
- assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
- actDuration < 0.5 * expMinSerialDuration);
- }
-
- @Test
- public void testParallelBalanced() throws Exception {
- // May need tweak validation sensitivity or add this:
- // Timing variances on shared machines can cause this test to fail
- // assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
- Topology top = newTopology("testParallelBalanced");
-
- // arrange for even channels to process ~2x as many as odd channels.
- BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline =
- (stream,ch) -> {
- long delay = (ch % 2 == 0) ? 10 : 20;
- return stream.map(fakeAnalytic(ch, delay, TimeUnit.MILLISECONDS));
- };
-
- int width = 4;
- int tupCnt = 60;
- Integer[] resultTuples = new Integer[tupCnt];
- for (int i = 0; i < tupCnt; i++)
- resultTuples[i] = i;
- AtomicInteger[] chCounts = new AtomicInteger[width];
- for (int ch = 0; ch < width; ch++)
- chCounts[ch] = new AtomicInteger();
-
- TStream<Integer> values = top.of(resultTuples);
-
- TStream<JsonObject> result = PlumbingStreams.parallelBalanced(values, width, pipeline).tag("result");
- TStream<Integer> result2 = result.map(jo -> {
- int r = jo.get("result").getAsInt();
- int ch = jo.get("channel").getAsInt();
- chCounts[ch].incrementAndGet();
- return r;
- });
-
- Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
- Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
-
- long begin = System.currentTimeMillis();
- complete(top, count);
- long end = System.currentTimeMillis();
-
- assertTrue(contents.getResult().toString(), contents.valid());
-
- long actDuration = end - begin;
- long expMinSerialDuration = resultTuples.length * 20;
- long expMinDuration = (resultTuples.length / width) * 20;
-
- System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
- System.out.println(top.getName()+" chCounts="+Arrays.asList(chCounts));
-
- // a gross level performance check w/parallel channels
- if (Boolean.getBoolean("edgent.build.ci"))
- System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
- else
- assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
- actDuration < 0.5 * expMinSerialDuration);
-
- int evenChCounts = 0;
- int oddChCounts = 0;
- for (int ch = 0; ch < width; ch++) {
- assertTrue(chCounts[ch].get() != 0);
- if (ch % 2 == 0)
- evenChCounts += chCounts[ch].get();
- else
- oddChCounts += chCounts[ch].get();
- }
- assertTrue(oddChCounts > 0.4 * evenChCounts
- && oddChCounts < 0.6 * evenChCounts);
- }
-
-// @Test
-// public void testParallelTiming() throws Exception {
-// Topology top = newTopology("testParallelTiming");
-//
-// BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> pipeline =
-// fakeParallelPipelineTiming(100, TimeUnit.MILLISECONDS);
-//
-// int width = 5;
-// // ToIntFunction<Integer> splitter = tuple -> tuple % width;
-// ToIntFunction<JsonObject> splitter = jo -> jo.get("result").getAsInt() % width;
-//
-// Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
-// TStream<Integer> values = top.of(resultTuples);
-//
-// TStream<JsonObject> inStream = values.map(value -> {
-// JsonObject jo = new JsonObject();
-// jo.addProperty("result", value);
-// jo.addProperty("channel", splitter.applyAsInt(jo));
-// jo.addProperty("enterParallelMsec", System.currentTimeMillis());
-// return jo;
-// });
-// TStream<JsonObject> result = PlumbingStreams.parallel(inStream, width, splitter, pipeline).tag("result");
-// TStream<Integer> result2 = result.map(jo -> {
-// jo.addProperty("exitParallelMsec", System.currentTimeMillis());
-// System.out.println("ch="+jo.get("channel").getAsInt()
-// +" endPipeline-startPipeline="
-// +(jo.get("endPipelineMsec").getAsLong()
-// - jo.get("startPipelineMsec").getAsLong())
-// +" exitParallel-startPipeine="
-// +(jo.get("exitParallelMsec").getAsLong()
-// - jo.get("startPipelineMsec").getAsLong()));
-// int r = jo.get("result").getAsInt();
-// assertEquals(splitter.applyAsInt(jo), jo.get("channel").getAsInt());
-// return r;
-// });
-//
-// Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
-// Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
-// long begin = System.currentTimeMillis();
-// complete(top, count);
-// long end = System.currentTimeMillis();
-// assertTrue(contents.getResult().toString(), contents.valid());
-//
-// long actDuration = end - begin;
-//
-// long expMinSerialDuration = resultTuples.length * 100;
-// long expMinDuration = (resultTuples.length / width) * 100;
-//
-// System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
-//
-// // a gross level performance check w/parallel channels
-// assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
-// actDuration < 0.5 * expMinSerialDuration);
-// }
-
- @Test
- public void testGate() throws Exception {
- Topology topology = newTopology("testGate");
-
- TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
-
- Semaphore semaphore = new Semaphore(1);
- raw = PlumbingStreams.gate(raw, semaphore);
-
- ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
- ArrayList<Integer> arrayResult = new ArrayList<>();
- for (int i = 0; i < 5; i++) {
- arrayResult.add(0);
- arrayResult.add(1);
- }
-
- raw.sink(t -> {
- //Add 0 to list because semaphore.acquire() in sync has occurred
- resultAvailablePermits.add(semaphore.availablePermits());
- semaphore.release();
- //Add 1 to list because semaphore.release() has executed
- resultAvailablePermits.add(semaphore.availablePermits());
- });
-
- Condition<List<String>> contents = topology.getTester()
- .streamContents(raw, "a", "b", "c", "d", "e");
- complete(topology, contents);
-
- assertTrue("valid:" + contents.getResult(), contents.valid());
- assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
- }
-
- @Test
- public void testGateWithLocking() throws Exception {
- Topology topology = newTopology("testGateWithLocking");
-
- TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
-
- Semaphore semaphore = new Semaphore(3);
- raw = PlumbingStreams.gate(raw, semaphore);
-
- ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
- ArrayList<Integer> arrayResult = new ArrayList<>();
- arrayResult.add(2);
- arrayResult.add(1);
- arrayResult.add(0);
-
- raw.sink(t -> {
- //Add number of availablePermits
- resultAvailablePermits.add(semaphore.availablePermits());
- });
-
- Condition<List<String>> contents = topology.getTester().streamContents(raw, "a", "b", "c");
- complete(topology, contents, 1000, TimeUnit.MILLISECONDS);
-
- assertTrue("valid:" + contents.getResult(), contents.valid());
- assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TStreamTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TStreamTest.java b/api/topology/src/test/java/edgent/test/topology/TStreamTest.java
deleted file mode 100644
index f24a322..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TStreamTest.java
+++ /dev/null
@@ -1,876 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.TWindow;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-@Ignore
-public abstract class TStreamTest extends TopologyAbstractTest {
-
- @Test
- public void testAlias() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a", "b");
- assertEquals(null, s.getAlias());
-
- TStream<String> s2 = s.alias("sAlias");
- assertSame(s, s2);
- assertEquals("sAlias", s.getAlias());
-
- try {
- s.alias("another"); // expect ISE - alias already set
- assertTrue(false);
- } catch (IllegalStateException e) {
- ; // expected
- }
-
- // test access at runtime
- s2 = s.peek(tuple -> {
- assertEquals("sAlias", s.getAlias());
- }).filter(tuple -> true);
-
- // just verify that alias presence doesn't otherwise muck up things
- Condition<Long> tc = t.getTester().tupleCount(s2, 2);
- Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
- complete(t, tc);
-
- assertTrue("contents "+contents.getResult(), contents.valid());
- }
-
- @Test
- public void testTag() throws Exception {
-
- Topology t = newTopology();
-
- List<String> tags = new ArrayList<>(Arrays.asList("tag1", "tag2"));
-
- TStream<String> s = t.strings("a", "b");
- assertEquals(0, s.getTags().size());
-
- TStream<String> s2 = s.tag("tag1", "tag2");
- assertSame(s, s2);
- assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
-
- tags.add("tag3");
- s.tag("tag3");
- assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
-
- s.tag("tag3", "tag2", "tag1"); // ok to redundantly add
- assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
-
- // test access at runtime
- s2 = s.peek(tuple -> {
- assertTrue("s.tags="+s.getTags(), s.getTags().containsAll(tags));
- }).filter(tuple -> true);
-
- // just verify that tag presence doesn't otherwise muck up things
- Condition<Long> tc = t.getTester().tupleCount(s2, 2);
- Condition<List<String>> contents = t.getTester().streamContents(s2, "a", "b");
- complete(t, tc);
-
- assertTrue("contents "+contents.getResult(), contents.valid());
- }
-
- @Test
- public void testFilter() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a", "b", "c");
- s = s.filter(tuple -> "b".equals(tuple));
- assertStream(t, s);
-
- Condition<Long> tc = t.getTester().tupleCount(s, 1);
- Condition<List<String>> contents = t.getTester().streamContents(s, "b");
- complete(t, tc);
-
- assertTrue(contents.valid());
- }
-
- /**
- * Test Peek. This will only work with an embedded setup.
- *
- * @throws Exception
- */
- @Test
- public void testPeek() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a", "b", "c");
- List<String> peekedValues = new ArrayList<>();
- TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple));
- assertSame(s, speek);
-
- Condition<Long> tc = t.getTester().tupleCount(s, 3);
- Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
- complete(t, tc);
-
- assertTrue(contents.valid());
- assertEquals(contents.getResult(), peekedValues);
- }
-
- @Test
- public void testMultiplePeek() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a", "b", "c");
- List<String> peekedValues = new ArrayList<>();
- TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st"));
- assertSame(s, speek);
-
- TStream<String> speek2 = s.peek(tuple -> peekedValues.add(tuple + "2nd"));
- assertSame(s, speek2);
- TStream<String> speek3 = s.peek(tuple -> peekedValues.add(tuple + "3rd"));
- assertSame(s, speek3);
-
- Condition<Long> tc = t.getTester().tupleCount(s, 3);
- Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
- complete(t, tc);
-
- assertTrue(contents.valid());
- List<String> expected = Arrays.asList("a1st", "a2nd", "a3rd", "b1st", "b2nd", "b3rd", "c1st", "c2nd", "c3rd");
- assertEquals(expected, peekedValues);
- }
-
- @Test
- public void testMap() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("32", "423", "-746");
- TStream<Integer> i = s.map(Integer::valueOf);
- assertStream(t, i);
-
- Condition<Long> tc = t.getTester().tupleCount(i, 3);
- Condition<List<Integer>> contents = t.getTester().streamContents(i, 32, 423, -746);
- complete(t, tc);
-
- assertTrue(contents.getResult().toString(), contents.valid());
- }
-
- @Test
- public void testModifyWithDrops() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("32", "423", "-746");
- TStream<Integer> i = s.map(Integer::valueOf);
- i = i.modify(tuple -> tuple < 0 ? null : tuple + 27);
- assertStream(t, i);
-
- Condition<Long> tc = t.getTester().tupleCount(i, 2);
- Condition<List<Integer>> contents = t.getTester().streamContents(i, 59, 450);
- complete(t, tc);
-
- assertTrue(contents.getResult().toString(), contents.valid());
- }
-
- @Test
- public void testModify() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a", "b", "c");
- TStream<String> i = s.modify(tuple -> tuple.concat("M"));
- assertStream(t, i);
-
- Condition<Long> tc = t.getTester().tupleCount(i, 3);
- Condition<List<String>> contents = t.getTester().streamContents(i, "aM", "bM", "cM");
- complete(t, tc);
-
- assertTrue(contents.valid());
- }
-
- @Test
- public void tesFlattMap() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("mary had a little lamb",
- "its fleece was white as snow");
- TStream<String> w = s.flatMap(tuple->Arrays.asList(tuple.split(" ")));
- assertStream(t, w);
-
- Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
- "a", "little", "lamb", "its", "fleece", "was", "white", "as",
- "snow");
- complete(t, contents);
-
- assertTrue(contents.getResult().toString(), contents.valid());
- }
-
- @Test
- public void tesFlattMapWithNullIterator() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("mary had a little lamb", "NOTUPLES",
- "its fleece was white as snow");
- TStream<String> w = s.flatMap(tuple->tuple.equals("NOTUPLES") ? null : Arrays.asList(tuple.split(" ")));
- assertStream(t, w);
-
- Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
- "a", "little", "lamb", "its", "fleece", "was", "white", "as",
- "snow");
- complete(t, contents);
-
- assertTrue(contents.getResult().toString(), contents.valid());
- }
-
- @Test
- public void tesFlattMapWithNullValues() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("mary had a little lamb",
- "its fleece was white as snow");
- TStream<String> w = s.flatMap(tuple-> {List<String> values = Arrays.asList(tuple.split(" "));
- values.set(2, null); values.set(4, null); return values;});
- assertStream(t, w);
-
- Condition<List<String>> contents = t.getTester().streamContents(w, "mary", "had",
- "little", "its", "fleece", "white",
- "snow");
- complete(t, contents);
-
- assertTrue(contents.getResult().toString(), contents.valid());
- }
-
- /**
- * Test split() with no drops.
- * @throws Exception on failure
- */
- @Test
- public void testSplit() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a1", "b1", "a2", "c1", "e1", "c2", "c3", "b2", "a3", "b3", "d1", "e2");
- List<TStream<String>> splits = s.split(3, tuple -> tuple.charAt(0) - 'a');
-
- Condition<Long> tc0 = t.getTester().tupleCount(splits.get(0), 4);
- Condition<Long> tc1 = t.getTester().tupleCount(splits.get(1), 5);
- Condition<Long> tc2 = t.getTester().tupleCount(splits.get(2), 3);
- Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(0), "a1", "a2", "a3", "d1");
- Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(1), "b1", "e1", "b2", "b3", "e2");
- Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(2), "c1", "c2", "c3");
-
- complete(t, t.getTester().and(tc0, tc1, tc2));
-
- assertTrue(contents0.toString(), contents0.valid());
- assertTrue(contents1.toString(), contents1.valid());
- assertTrue(contents2.toString(), contents2.valid());
- }
-
- /**
- * Test split() with drops.
- * @throws Exception on failure
- */
- @Test
- public void testSplitWithDrops() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a1", "b1", "a2", "c1", "e1", "c2", "c3", "b2", "a3", "b3", "d1", "e2");
- List<TStream<String>> splits = s.split(3, tuple -> {
- switch (tuple.charAt(0)) {
- case 'a':
- return 1;
- case 'b':
- return 4;
- case 'c':
- return 8;
- case 'd':
- return -34;
- case 'e':
- return -1;
- default:
- return -1;
- }
- });
-
- Condition<Long> tc0 = t.getTester().tupleCount(splits.get(0), 0);
- Condition<Long> tc1 = t.getTester().tupleCount(splits.get(1), 6);
- Condition<Long> tc2 = t.getTester().tupleCount(splits.get(2), 3);
- Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(0));
- Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(1), "a1", "b1", "a2", "b2", "a3",
- "b3");
- Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(2), "c1", "c2", "c3");
-
- complete(t, t.getTester().and(tc0, tc1, tc2));
-
- assertTrue(contents0.toString(), contents0.valid());
- assertTrue(contents1.toString(), contents1.valid());
- assertTrue(contents2.toString(), contents2.valid());
- }
-
- /**
- * Test split() zero outputs
- * @throws Exception on failure
- */
- @Test(expected = IllegalArgumentException.class)
- public void testSplitWithZeroOutputs() throws Exception {
- newTopology().strings("a1").split(0, tuple -> 0);
- }
-
- /**
- * Test split() negative outputs
- * @throws Exception on failure
- */
- @Test(expected = IllegalArgumentException.class)
- public void testSplitWithNegativeOutputs() throws Exception {
- newTopology().strings("a1").split(-28, tuple -> 0);
- }
-
- /**
- * Test enum data structure
- */
- private enum LogSeverityEnum {
- ALERT(1), CRITICAL(2), ERROR(3), WARNING(4), NOTICE(5), INFO(6), DEBUG(7);
-
- @SuppressWarnings("unused")
- private final int code;
-
- LogSeverityEnum(final int code) {
- this.code = code;
- }
- }
-
- /**
- * Test split(enum) with integer type enum.
- * @throws Exception on failure
- */
- @Test
- public void testSplitWithEnum() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("Log1_ALERT", "Log2_INFO", "Log3_INFO", "Log4_INFO", "Log5_ERROR", "Log6_ERROR", "Log7_CRITICAL");
- TStream<String> i = s.map(String::toString);
- EnumMap<LogSeverityEnum,TStream<String>> splits = i.split(LogSeverityEnum.class, e -> LogSeverityEnum.valueOf(e.split("_")[1]));
-
- assertStream(t, i);
-
- Condition<Long> tc0 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ALERT), 1);
- Condition<Long> tc1 = t.getTester().tupleCount(splits.get(LogSeverityEnum.INFO), 3);
- Condition<Long> tc2 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ERROR), 2);
- Condition<Long> tc3 = t.getTester().tupleCount(splits.get(LogSeverityEnum.CRITICAL), 1);
- Condition<Long> tc4 = t.getTester().tupleCount(splits.get(LogSeverityEnum.WARNING), 0);
-
- Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(LogSeverityEnum.ALERT), "Log1_ALERT");
- Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(LogSeverityEnum.INFO), "Log2_INFO",
- "Log3_INFO", "Log4_INFO");
- Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(LogSeverityEnum.ERROR), "Log5_ERROR",
- "Log6_ERROR");
- Condition<List<String>> contents3 = t.getTester().streamContents(splits.get(LogSeverityEnum.CRITICAL), "Log7_CRITICAL");
- Condition<List<String>> contents4 = t.getTester().streamContents(splits.get(LogSeverityEnum.WARNING));
-
- complete(t, t.getTester().and(tc0, tc1, tc2, tc3, tc4));
-
-
- assertTrue(contents0.toString(), contents0.valid());
- assertTrue(contents1.toString(), contents1.valid());
- assertTrue(contents2.toString(), contents2.valid());
- assertTrue(contents3.toString(), contents3.valid());
- assertTrue(contents4.toString(), contents4.valid());
- }
-
- private enum EnumClassWithZerosize {
- ;
- }
-
- /**
- * Test split(enum) with integer type enum.
- * @throws Exception on failure
- */
- @Test(expected = IllegalArgumentException.class)
- public void testSplitWithEnumForZeroSizeClass() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("Test");
- s.split(EnumClassWithZerosize.class, e -> EnumClassWithZerosize.valueOf("Test"));
- }
-
- @Test
- public void testFanout2() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a", "b", "c");
- TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
- TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
-
- Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
- Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
- Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cfo2");
-
- complete(t, t.getTester().and(tsm, tsmc));
-
- assertTrue(tsf.getResult().toString(), tsf.valid());
- assertTrue(tsm.getResult().toString(), tsm.valid());
- }
- @Test
- public void testFanout3() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a", "b", "cde");
- TStream<String> sf = s.filter(tuple -> "b".equals(tuple));
- TStream<String> sm = s.modify(tuple -> tuple.concat("fo2"));
- TStream<byte[]> st = s.map(tuple -> tuple.getBytes());
-
- Condition<Long> tsfc = t.getTester().tupleCount(sf, 1);
- Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
- Condition<Long> tstc = t.getTester().tupleCount(st, 3);
- Condition<List<String>> tsf = t.getTester().streamContents(sf, "b");
- Condition<List<String>> tsm = t.getTester().streamContents(sm, "afo2", "bfo2", "cdefo2");
- Condition<List<byte[]>> tst = t.getTester().streamContents(st, "a".getBytes(), "b".getBytes(), "cde".getBytes());
-
- complete(t, t.getTester().and(tsfc, tsmc, tstc));
-
- assertTrue(tsf.valid());
- assertTrue(tsm.valid());
-
- // Can't use equals on byte[]
- assertEquals(3, tst.getResult().size());
- assertEquals("a", new String(tst.getResult().get(0)));
- assertEquals("b", new String(tst.getResult().get(1)));
- assertEquals("cde", new String(tst.getResult().get(2)));
- }
-
- @Test
- public void testPeekThenFanout() throws Exception {
- _testFanoutWithPeek(1, 0, 0);
- }
-
- @Test
- public void testFanoutThenPeek() throws Exception {
- _testFanoutWithPeek(0, 0, 1);
- }
-
- @Test
- public void testPeekMiddleFanout() throws Exception {
- _testFanoutWithPeek(0, 1, 0);
- }
-
- @Test
- public void testMultiPeekFanout() throws Exception {
- _testFanoutWithPeek(3, 3, 3);
- }
-
- void _testFanoutWithPeek(int numBefore, int numMiddle, int numAfter) throws Exception {
-
- Topology t = newTopology();
-
- List<Peeked> values = new ArrayList<>();
- values.add(new Peeked(33));
- values.add(new Peeked(-214));
- values.add(new Peeked(9234));
- for (Peeked p : values)
- assertEquals(0, p.peekedCnt);
-
- TStream<Peeked> s = t.collection(values);
- if (numBefore > 0) {
- for (int i = 0; i < numBefore; i++)
- s.peek(tuple -> tuple.peekedCnt++);
- }
-
- TStream<Peeked> sf = s.filter(tuple -> tuple.value > 0);
- if (numMiddle > 0) {
- for (int i = 0; i < numMiddle; i++)
- s.peek(tuple -> tuple.peekedCnt++);
- }
- TStream<Peeked> sm = s.modify(tuple -> new Peeked(tuple.value + 37, tuple.peekedCnt));
-
- if (numAfter > 0) {
- for (int i = 0; i < numAfter; i++)
- s.peek(tuple -> tuple.peekedCnt++);
- }
-
- int totPeeks = numBefore + numMiddle + numAfter;
- Condition<Long> tsfc = t.getTester().tupleCount(sf, 2);
- Condition<Long> tsmc = t.getTester().tupleCount(sm, 3);
- Condition<List<Peeked>> tsf = t.getTester().streamContents(sf, new Peeked(33, totPeeks), new Peeked(9234, totPeeks));
- Condition<List<Peeked>> tsm = t.getTester().streamContents(sm, new Peeked(70, totPeeks), new Peeked(-177, totPeeks),
- new Peeked(9271, totPeeks));
-
- complete(t, t.getTester().and(tsfc, tsmc));
-
- assertTrue(tsf.getResult().toString(), tsf.valid());
- assertTrue(tsm.getResult().toString(), tsm.valid());
- }
-
- public static class Peeked implements Serializable {
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + peekedCnt;
- result = prime * result + value;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Peeked other = (Peeked) obj;
- if (peekedCnt != other.peekedCnt)
- return false;
- if (value != other.value)
- return false;
- return true;
- }
-
- private static final long serialVersionUID = 1L;
- final int value;
- int peekedCnt;
-
- Peeked(int value) {
- this.value = value;
- }
-
- Peeked(int value, boolean peeked) {
- this(value, 1);
- }
-
- Peeked(int value, int peekedCnt) {
- this.value = value;
- // this.peeked = true;
- this.peekedCnt = peekedCnt;
- }
-
- public String toString() {
- return "{" + "value=" + value + " peekedCnt=" + peekedCnt + "}";
- }
- }
-
- /**
- * Test Union with itself.
- *
- * @throws Exception
- */
- @Test
- public void testUnionWithSelf() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a", "b", "c");
-
- assertSame(s, s.union(s));
- assertSame(s, s.union(Collections.emptySet()));
- assertSame(s, s.union(Collections.singleton(s)));
- }
-
- @Test
- public void testUnion2() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s1 = t.strings("a", "b", "c");
- TStream<String> s2 = t.strings("d", "e");
- TStream<String> su = s1.union(s2);
- assertNotSame(s1, su);
- assertNotSame(s2, su);
- TStream<String> r = su.modify(v -> v.concat("X"));
-
- Condition<Long> tc = t.getTester().tupleCount(r, 5);
- Condition<List<String>> contents = t.getTester().contentsUnordered(r,
- "aX", "bX", "cX", "dX", "eX");
- complete(t, tc);
-
- assertTrue(tc.getResult().toString(), tc.valid());
- assertTrue(contents.getResult().toString(), contents.valid());
- }
-
- @Test
- public void testUnion4() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s1 = t.strings("a", "b", "c");
- TStream<String> s2 = t.strings("d", "e");
- TStream<String> s3 = t.strings("f", "g", "h", "i");
- TStream<String> s4 = t.strings("j");
- TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s2, s3, s4)));
- assertNotSame(s1, su);
- assertNotSame(s2, su);
- assertNotSame(s3, su);
- assertNotSame(s4, su);
- TStream<String> r = su.modify(v -> v.concat("Y"));
-
- Condition<Long> tc = t.getTester().tupleCount(r, 10);
- Condition<List<String>> contents = t.getTester().contentsUnordered(r,
- "aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
- complete(t, tc);
-
- assertTrue(tc.getResult().toString(), tc.valid());
- assertTrue(contents.getResult().toString(), contents.valid());
- }
-
- @Test
- public void testUnion4WithSelf() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s1 = t.strings("a", "b", "c");
- TStream<String> s2 = t.strings("d", "e");
- TStream<String> s3 = t.strings("f", "g", "h", "i");
- TStream<String> s4 = t.strings("j");
- TStream<String> su = s1.union(new HashSet<>(Arrays.asList(s1, s2, s3, s4)));
- assertNotSame(s1, su);
- assertNotSame(s2, su);
- assertNotSame(s3, su);
- assertNotSame(s4, su);
- TStream<String> r = su.modify(v -> v.concat("Y"));
-
- Condition<Long> tc = t.getTester().tupleCount(r, 10);
- Condition<List<String>> contents = t.getTester().contentsUnordered(r,
- "aY", "bY", "cY", "dY", "eY", "fY", "gY", "hY", "iY", "jY");
- complete(t, tc);
-
- assertTrue(tc.getResult().toString(), tc.valid());
- assertTrue(contents.getResult().toString(), contents.valid());
- }
-
- @Test
- public void testSink() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a", "b", "c");
-
- List<String> sinked = new ArrayList<>();
- TSink<String> terminal = s.sink(tuple -> sinked.add(tuple));
- assertNotNull(terminal);
- assertSame(t, terminal.topology());
- assertSame(s, terminal.getFeed());
- TStream<String> s1 = s.filter(tuple -> true);
-
- Condition<Long> tc = t.getTester().tupleCount(s1, 3);
- complete(t, tc);
-
- assertEquals("a", sinked.get(0));
- assertEquals("b", sinked.get(1));
- assertEquals("c", sinked.get(2));
- }
-
- /**
- * Submit multiple jobs concurrently using ProcessSource.
- * @throws Exception on failure
- */
- @Test
- public void testMultiTopology() throws Exception {
-
- int executions = 4;
- ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
- Executors.newFixedThreadPool(executions));
- for (int i = 0; i < executions; i++) {
- completer.submit(() -> {
- Topology t = newTopology();
- TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h");
- s.sink((tuple) -> { if ("h".equals(tuple)) System.out.println(tuple);});
- Condition<Long> tc = t.getTester().tupleCount(s, 8);
- complete(t, tc);
- return true;
- });
- }
- waitForCompletion(completer, executions);
- }
-
- /**
- * Submit multiple jobs concurrently using ProcessSource.
- * @throws Exception on failure
- */
- @Test
- public void testMultiTopologyWithError() throws Exception {
-
- int executions = 4;
- ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
- Executors.newFixedThreadPool(executions));
- for (int i = 0; i < executions; i++) {
- completer.submit(() -> {
- Topology t = newTopology();
- TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h");
- // Throw on the 8th tuple
- s.sink((tuple) -> { if ("h".equals(tuple)) throw new RuntimeException("Expected Test Exception");});
- // Expect 7 tuples out of 8
- Condition<Long> tc = t.getTester().tupleCount(s, 7);
- complete(t, tc);
- return true;
- });
- }
- waitForCompletion(completer, executions);
- }
-
- /**
- * Submit multiple jobs concurrently using PeriodicSource.
- * @throws Exception on failure
- */
- @Test
- public void testMultiTopologyPollWithError() throws Exception {
-
- int executions = 4;
- ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
- Executors.newFixedThreadPool(executions));
- for (int i = 0; i < executions; i++) {
- completer.submit(() -> {
- Topology t = newTopology();
- AtomicLong n = new AtomicLong(0);
- TStream<Long> s = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
- // Throw on the 8th tuple
- s.sink((tuple) -> { if (8 == n.get()) throw new RuntimeException("Expected Test Exception");});
- // Expect 7 tuples out of 8
- Condition<Long> tc = t.getTester().tupleCount(s, 7);
- complete(t, tc);
- return true;
- });
- }
- waitForCompletion(completer, executions);
- }
-
- @Test
- public void testJoinWithWindow() throws Exception{
- Topology t = newTopology();
-
- List<Integer> ints = new ArrayList<>();
- List<Integer> lookupInts = new ArrayList<>();
-
- // Ints to populate the window
- for(int i = 0; i < 100; i++){
- ints.add(i);
- }
-
- // Ints to lookup partitions in window
- for(int i = 0; i < 10; i++){
- lookupInts.add(i);
- }
- TStream<Integer> intStream = t.collection(ints);
-
- // Wait until the window is populated, and then submit tuples
- TStream<Integer> lookupIntStream = t.source(() -> {
- try {
- Thread.sleep(500);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return lookupInts;
- });
-
- TWindow<Integer, Integer> window = intStream.last(10, tuple -> tuple % 10);
- TStream<Integer> joinsHappened = lookupIntStream.join(tuple -> tuple % 10, window, (number, partitionContents) -> {
- assertTrue(partitionContents.size() == 10);
- for(Integer element : partitionContents)
- assertTrue(number % 10 == element % 10);
-
- // Causes an error if two numbers map to the same partition, which shouldn't happen
- partitionContents.clear();
- return 0;
- });
-
- Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 10);
- complete(t, tc);
- }
-
- @Test
- public void testJoinLastWithKeyer() throws Exception{
- Topology t = newTopology();
-
- List<Integer> ints = new ArrayList<>();
- for(int i = 0; i < 100; i++){
- ints.add(i);
- }
-
- TStream<Integer> intStream = t.collection(ints);
-
- // Wait until the window is populated, and then submit tuples
- TStream<Integer> lookupIntStream = t.source(() -> {
- try {
- Thread.sleep(500);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return ints;
- });
-
- TStream<String> joinsHappened = lookupIntStream.joinLast(tuple -> tuple, intStream, tuple -> tuple, (a, b) -> {
- assertTrue(a.equals(b));
- return "0";
- });
-
- Condition<Long> tc = t.getTester().tupleCount(joinsHappened, 100);
- complete(t, tc);
- }
-
- private void waitForCompletion(ExecutorCompletionService<Boolean> completer, int numtasks) throws ExecutionException {
- int remainingTasks = numtasks;
- while (remainingTasks > 0) {
- try {
- Future<Boolean> completed = completer.poll(4, TimeUnit.SECONDS);
- if (completed == null) {
- System.err.println("Completer timed out");
- throw new RuntimeException(new TimeoutException());
- }
- else {
- completed.get();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- remainingTasks--;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TWindowTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TWindowTest.java b/api/topology/src/test/java/edgent/test/topology/TWindowTest.java
deleted file mode 100644
index 7d522d9..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TWindowTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static edgent.function.Functions.identity;
-import static edgent.function.Functions.unpartitioned;
-import static edgent.function.Functions.zero;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import edgent.topology.TStream;
-import edgent.topology.TWindow;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-@Ignore
-public abstract class TWindowTest extends TopologyAbstractTest{
- @Test
- public void testCountBasedBatch() throws Exception{
- Topology top = newTopology();
- List<Integer> intList = new ArrayList<>();
- for(int i = 0; i < 1000;i++)
- intList.add(i);
- TStream<Integer> ints = top.source(() -> intList);
-
- TWindow<Integer, Integer> window = ints.last(100, tuple -> 0);
- TStream<Integer> sizes = window.batch((tuples, key) -> {
- return tuples.size();
- });
- Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
- 100,100,100,100,100,100,100,100,100,100);
- complete(top, contents);
- assertTrue(contents.valid());
- }
-
- @Test
- public void testTimeBasedBatch() throws Exception{
- Topology top = newTopology();
- TStream<Integer> ints = top.poll(() -> {
- return 1;
- }, 10, TimeUnit.MILLISECONDS);
-
- TWindow<Integer, Integer> window = ints.last(1000, TimeUnit.MILLISECONDS, tuple -> 0);
- TStream<Integer> sizes = window.batch((tuples, key) -> {
- return tuples.size();
- });
-
- Condition<List<Integer> > contents = top.getTester().streamContents(sizes,
- 100, 100, 100, 100, 100, 100, 100, 100, 100, 100);
- complete(top, contents);
- System.out.println(contents.getResult());
- for(Integer size : contents.getResult()){
- assertTrue(size >= 90 && size <= 110);
- }
- }
-
- @Test
- public void testKeyedWindowSum() throws Exception {
- Topology t = newTopology();
-
- TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4,4,3,4,4,3));
- TWindow<Integer, Integer> window = integers.last(9, identity());
- assertSame(identity(), window.getKeyFunction());
- assertSame(t, window.topology());
- assertSame(integers, window.feeder());
-
- TStream<Integer> sums = window.aggregate((tuples, key) -> {
- // All tuples in a partition are equal due to identity
- assertEquals(1, new HashSet<>(tuples).size());
- int sum = 0;
- for(Integer tuple : tuples)
- sum+=tuple;
- return sum;
- });
-
- Condition<Long> tc = t.getTester().tupleCount(sums, 9);
- Condition<List<Integer>> contents = t.getTester().streamContents(sums,
- 1, 2, 3, 4, 8, 6, 12, 16, 9);
- complete(t, tc);
-
- assertTrue(contents.valid());
- }
-
- @Test
- public void testWindowSum() throws Exception {
- Topology t = newTopology();
-
- TStream<Integer> integers = t.collection(Arrays.asList(1,2,3,4));
- TWindow<Integer, Integer> window = integers.last(4, unpartitioned());
- assertSame(unpartitioned(), window.getKeyFunction());
- TStream<Integer> sums = window.aggregate((tuples, key) -> {
- assertEquals(Integer.valueOf(0), key);
- int sum = 0;
- for(Integer tuple : tuples)
- sum+=tuple;
- return sum;
- });
-
- Condition<Long> tc = t.getTester().tupleCount(sums, 4);
- Condition<List<Integer>> contents = t.getTester().streamContents(sums, 1, 3, 6, 10);
- complete(t, tc);
-
- assertTrue(contents.valid());
- }
-
- @Test
- public void testTimeWindowTimeDiff() throws Exception {
- // Timing variances on shared machines can cause this test to fail
- assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
- Topology t = newTopology();
-
- // Define data
- ConcurrentLinkedQueue<Long> diffs = new ConcurrentLinkedQueue<>();
-
- // Poll data
- TStream<Long> times = t.poll(() -> {
- return System.currentTimeMillis();
- }, 1, TimeUnit.MILLISECONDS);
-
- TWindow<Long, Integer> window = times.last(1, TimeUnit.SECONDS, unpartitioned());
- assertSame(zero(), window.getKeyFunction());
- TStream<Long> diffStream = window.aggregate((tuples, key) -> {
- assertEquals(Integer.valueOf(0), key);
- if(tuples.size() < 2){
- return null;
- }
- return tuples.get(tuples.size() -1) - tuples.get(0);
- });
-
- diffStream.sink((tuple) -> diffs.add(tuple));
-
- Condition<Long> tc = t.getTester().tupleCount(diffStream, 5000);
- complete(t, tc);
-
- for(Long diff : diffs){
- assertTrue("Diff is: " + diff, diff >=0 && diff < 1060);
- }
-
- }
-
- public static boolean withinTolerance(double expected, Double actual, double tolerance) {
- double lowBound = (1.0 - tolerance) * expected;
- double highBound = (1.0 + tolerance) * expected;
- return (actual < highBound && actual > lowBound);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TopologyAbstractTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TopologyAbstractTest.java b/api/topology/src/test/java/edgent/test/topology/TopologyAbstractTest.java
deleted file mode 100644
index 5bb2248..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TopologyAbstractTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Before;
-import org.junit.Ignore;
-
-import com.google.gson.JsonObject;
-
-import edgent.execution.Job;
-import edgent.execution.Submitter;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.TopologyProvider;
-import edgent.topology.tester.Condition;
-
-@Ignore("Abstract class proiding generic topology testing.")
-public abstract class TopologyAbstractTest implements TopologyTestSetup {
-
- private TopologyProvider topologyProvider;
- private Submitter<Topology, Job> submitter;
-
- @Before
- public void setup() {
- topologyProvider = createTopologyProvider();
- assertNotNull(topologyProvider);
-
- submitter = createSubmitter();
- assertNotNull(submitter);
- }
-
- protected Submitter<Topology, ?> getSubmitter() {
- return submitter;
- }
-
- @Override
- public TopologyProvider getTopologyProvider() {
- return topologyProvider;
- }
-
- protected Topology newTopology() {
- return getTopologyProvider().newTopology();
- }
-
- protected Topology newTopology(String name) {
- return getTopologyProvider().newTopology(name);
- }
-
- protected boolean complete(Topology topology, Condition<?> endCondition, long timeout, TimeUnit units) throws Exception {
- return topology.getTester().complete(getSubmitter(), new JsonObject(), endCondition, timeout, units);
- }
-
- protected boolean complete(Topology topology, Condition<?> endCondition) throws Exception {
- return complete(topology, endCondition, 10, TimeUnit.SECONDS);
- }
-
- public void completeAndValidate(String msg, Topology t,
- TStream<String> s, int secTimeout, String... expected)
- throws Exception {
- completeAndValidate(true/*ordered*/, msg, t, s, secTimeout, expected);
- }
-
- public void completeAndValidate(boolean ordered, String msg, Topology t,
- TStream<String> s, int secTimeout, String... expected)
- throws Exception {
-
- // if expected.length==0 we must run until the job completes or tmo
- Condition<Long> tc = t.getTester().tupleCount(s,
- expected.length == 0 ? Long.MAX_VALUE : expected.length);
- Condition<List<String>> contents =
- ordered ? t.getTester().streamContents(s, expected)
- : t.getTester().contentsUnordered(s, expected);
-
- complete(t, tc, secTimeout, TimeUnit.SECONDS);
-
- assertTrue(msg + " contents:" + contents.getResult(), contents.valid());
- if (expected.length != 0)
- assertTrue("valid:" + tc.getResult(), tc.valid());
- }
-
- protected void assertStream(Topology t, TStream<?> s) {
- assertSame(t, s.topology());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TopologyElementTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TopologyElementTest.java b/api/topology/src/test/java/edgent/test/topology/TopologyElementTest.java
deleted file mode 100644
index 9deba8b..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TopologyElementTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.TopologyElement;
-
-public class TopologyElementTest {
-
- @Test
- public void testHierachy() {
- assertTrue(TopologyElement.class.isAssignableFrom(Topology.class));
- assertTrue(TopologyElement.class.isAssignableFrom(TStream.class));
- assertTrue(TopologyElement.class.isAssignableFrom(TSink.class));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TopologyTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TopologyTest.java b/api/topology/src/test/java/edgent/test/topology/TopologyTest.java
deleted file mode 100644
index 1690a11..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TopologyTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import edgent.execution.Job;
-import edgent.execution.mbeans.PeriodMXBean;
-import edgent.execution.services.ControlService;
-import edgent.execution.services.RuntimeServices;
-import edgent.function.Supplier;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-@Ignore("abstract, provides common tests for concrete implementations")
-public abstract class TopologyTest extends TopologyAbstractTest {
-
- @Test
- public void testBasics() {
- final Topology t = newTopology("T123");
- assertEquals("T123", t.getName());
- assertSame(t, t.topology());
- }
-
- @Test
- public void testDefaultName() {
- final Topology t = newTopology();
- assertSame(t, t.topology());
- assertNotNull(t.getName());
- }
-
- @Test
- public void testStringContants() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings("a", "b", "c");
- assertStream(t, s);
-
- Condition<Long> tc = t.getTester().tupleCount(s, 3);
- Condition<List<String>> contents = t.getTester().streamContents(s, "a", "b", "c");
- complete(t, tc);
- assertTrue(contents.valid());
- }
-
- @Test
- public void testNoStringContants() throws Exception {
-
- Topology t = newTopology();
-
- TStream<String> s = t.strings();
-
- Condition<Long> tc = t.getTester().tupleCount(s, 0);
-
- complete(t, tc);
-
- assertTrue(tc.valid());
- }
-
- @Test
- public void testRuntimeServices() throws Exception {
- Topology t = newTopology();
- TStream<String> s = t.strings("A");
-
- Supplier<RuntimeServices> serviceGetter =
- t.getRuntimeServiceSupplier();
-
- TStream<Boolean> b = s.map(tuple ->
- serviceGetter.get().getService(ThreadFactory.class) != null
- && serviceGetter.get().getService(ScheduledExecutorService.class) != null
- );
-
- Condition<List<Boolean>> tc = t.getTester().streamContents(b, Boolean.TRUE);
- complete(t, tc);
-
- assertTrue(tc.valid());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testAdaptablePoll() throws Exception {
- // Ensure the API supporting an adaptable poll() is functional.
- Job job = null;
- try {
- Topology t = newTopology();
- TStream<String> s = t.poll(() -> (new Date()).toString(),
- 1, TimeUnit.HOURS)
- .alias("myAlias");
-
- AtomicInteger cnt = new AtomicInteger();
- s.peek(tuple -> cnt.incrementAndGet());
-
- Future<Job> jf = (Future<Job>) getSubmitter().submit(t);
- job = jf.get();
- assertEquals(Job.State.RUNNING, job.getCurrentState());
-
- setPollFrequency(s, 100, TimeUnit.MILLISECONDS);
- cnt.set(0);
- Thread.sleep(TimeUnit.SECONDS.toMillis(3));
- int curCnt = cnt.get();
- assertTrue("curCnt="+curCnt, curCnt >= 20);
-
- setPollFrequency(s, 1, TimeUnit.SECONDS);
- cnt.set(0);
- Thread.sleep(TimeUnit.SECONDS.toMillis(3));
- curCnt = cnt.get();
- assertTrue("curCnt="+curCnt, curCnt >= 2 && curCnt <= 4);
-
- setPollFrequency(s, 100, TimeUnit.MILLISECONDS);
- cnt.set(0);
- Thread.sleep(TimeUnit.SECONDS.toMillis(3));
- curCnt = cnt.get();
- assertTrue("curCnt="+curCnt, curCnt >= 20);
- }
- finally {
- if (job != null)
- job.stateChange(Job.Action.CLOSE);
- }
-
- }
-
- static <T> void setPollFrequency(TStream<T> pollStream, long period, TimeUnit unit) {
- ControlService cs = pollStream.topology().getRuntimeServiceSupplier()
- .get().getService(ControlService.class);
- PeriodMXBean control = cs.getControl(TStream.TYPE,
- pollStream.getAlias(), PeriodMXBean.class);
- control.setPeriod(period, unit);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/TopologyTestSetup.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/TopologyTestSetup.java b/api/topology/src/test/java/edgent/test/topology/TopologyTestSetup.java
deleted file mode 100644
index 971d936..0000000
--- a/api/topology/src/test/java/edgent/test/topology/TopologyTestSetup.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import edgent.execution.Job;
-import edgent.execution.Submitter;
-import edgent.topology.Topology;
-import edgent.topology.TopologyProvider;
-
-public interface TopologyTestSetup {
-
- TopologyProvider createTopologyProvider();
-
- TopologyProvider getTopologyProvider();
-
- Submitter<Topology, Job> createSubmitter();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/services/TestApplications.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/services/TestApplications.java b/api/topology/src/test/java/edgent/test/topology/services/TestApplications.java
deleted file mode 100644
index 4972c51..0000000
--- a/api/topology/src/test/java/edgent/test/topology/services/TestApplications.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology.services;
-
-import com.google.gson.JsonObject;
-
-import edgent.function.BiConsumer;
-import edgent.topology.Topology;
-import edgent.topology.services.TopologyBuilder;
-
-/**
- * Some dummy test applications that will be part of this
- * jar including having service provider entries.
- * Used for testing the application service.
- *
- */
-public class TestApplications {
-
- private static abstract class App implements TopologyBuilder {
-
- @Override
- public BiConsumer<Topology, JsonObject> getBuilder() {
- return (t,c) -> t.strings(getName()).print();
- }
- }
-
- public static class AppOne extends App {
- @Override
- public String getName() {
- return "FirstJarApp";
- }
- }
- public static class AppTwo extends App {
- @Override
- public String getName() {
- return "SecondJarApp";
- }
- }
- public static class AppThree extends App {
- @Override
- public String getName() {
- return "ThirdJarApp";
- }
- }
-}