You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:46 UTC
[45/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/function/src/test/java/edgent/test/function/FunctionsTest.java
----------------------------------------------------------------------
diff --git a/api/function/src/test/java/edgent/test/function/FunctionsTest.java b/api/function/src/test/java/edgent/test/function/FunctionsTest.java
deleted file mode 100644
index cfce455..0000000
--- a/api/function/src/test/java/edgent/test/function/FunctionsTest.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.function;
-
-import static edgent.function.Functions.identity;
-import static edgent.function.Functions.unpartitioned;
-import static edgent.function.Functions.zero;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-
-import edgent.function.Consumer;
-import edgent.function.Function;
-import edgent.function.Functions;
-import edgent.function.Supplier;
-
-public class FunctionsTest {
-
- int add;
-
- @Test
- public void testSynchronizedFunction() {
-
- Function<Integer,String> f1 = v -> Integer.toString(v);
- assertSame(f1, Functions.synchronizedFunction(f1));
-
- Function<Integer,Integer> f2 = v -> v * 2;
- assertSame(f2, Functions.synchronizedFunction(f2));
-
- int a = 7;
- Function<Integer,Integer> f3 = v -> v + a;
- assertSame(f3, Functions.synchronizedFunction(f3));
-
- int[] aa = new int[1];
- Function<Integer,Integer> f4 = v -> v + aa[0];
- assertNotSame(f4, Functions.synchronizedFunction(f4));
-
- aa[0] = 7;
- int r4 = f4.apply(2);
- assertEquals(9, r4);
- aa[0] = 13;
- r4 = f4.apply(9);
- assertEquals(22, r4);
-
- // verify a synchronized function uses the function reference
- // as the synchronization
- Function<Integer,Integer> f5 = new F5();
- add = 99;
- try {
- f5.apply(8);
- fail("Expected IllegalMonitorStateException");
- } catch (IllegalMonitorStateException e) {
- // expected
- }
- Function<Integer,Integer> f5s = Functions.synchronizedFunction(f5);
- assertNotSame(f5, f5s);
- int r5s = f5s.apply(18);
- assertEquals(117, r5s);
-
- Function<Integer,Integer> f6 = new F6();
- assertNotSame(f6, Functions.synchronizedFunction(f6));
- }
-
- class F5 implements Function<Integer,Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer apply(Integer value) {
- notify();
- return add + value;
- }
- }
- static class F6 implements Function<Integer,Integer>, AutoCloseable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer apply(Integer value) {
- return value;
- }
- @Override
- public void close() throws Exception {
- }
- }
-
- @Test
- public void testSynchronizedSupplier() {
-
- Supplier<Integer> f1 = () -> 3;
- assertSame(f1, Functions.synchronizedSupplier(f1));
-
- int a = 7;
- Supplier<Integer> f2 = () -> a;
- assertSame(f2, Functions.synchronizedSupplier(f2));
- int r2 = f2.get();
- assertEquals(7, r2);
-
-
- int[] aa = new int[1];
- Supplier<Integer> f3 = () -> aa[0];
- assertNotSame(f3, Functions.synchronizedSupplier(f3));
-
- aa[0] = 7;
- int r3 = f3.get();
- assertEquals(7, r3);
- aa[0] = 13;
- r3 = f3.get();
- assertEquals(13, r3);
-
- // verify a synchronized function uses the function reference
- // as the synchronization
- Supplier<Integer> f4 = new S4();
- add = 127;
- try {
- f4.get();
- fail("Expected IllegalMonitorStateException");
- } catch (IllegalMonitorStateException e) {
- // expected
- }
- Supplier<Integer> f4s = Functions.synchronizedSupplier(f4);
- assertNotSame(f4, f4s);
- int r4s = f4s.get();
- assertEquals(127, r4s);
-
- Supplier<Integer> f5 = new S5();
- assertNotSame(f5, Functions.synchronizedSupplier(f5));
- }
- class S4 implements Supplier<Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer get() {
- notify();
- return add;
- }
- }
- static class S5 implements Supplier<Integer>, AutoCloseable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer get() {
- return 0;
- }
- @Override
- public void close() throws Exception {
- }
- }
-
- static void consumer(Integer x) {
- }
-
- @Test
- public void testSynchronizedConsumer() {
-
- Consumer<Integer> f1 = v -> {};
- assertSame(f1, Functions.synchronizedConsumer(f1));
-
- int a = 7;
- Consumer<Integer> f2 = v -> consumer(a + v);
- assertSame(f2, Functions.synchronizedConsumer(f2));
-
- int[] aa = new int[1];
- Consumer<Integer> f3 = v -> aa[0] = v;
- assertNotSame(f3, Functions.synchronizedConsumer(f3));
-
- aa[0] = 7;
- f3.accept(85);
- assertEquals(85, aa[0]);
-
- // verify a synchronized function uses the function reference
- // as the synchronization
- Consumer<Integer> f4 = new C4();
- add = 127;
- try {
- f4.accept(99);
- fail("Expected IllegalMonitorStateException");
- } catch (IllegalMonitorStateException e) {
- // expected
- }
- Consumer<Integer> f4s = Functions.synchronizedConsumer(f4);
- assertNotSame(f4, f4s);
- f4s.accept(2421);
- assertEquals(2421, add);
-
- Consumer<Integer> f5 = new C5();
- assertNotSame(f5, Functions.synchronizedConsumer(f5));
- }
- class C4 implements Consumer<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void accept(Integer value) {
- notify();
- add = value;
- }
-
- }
- static class C5 implements Consumer<Integer>, AutoCloseable {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void accept(Integer value) {
- }
- @Override
- public void close() throws Exception {
- }
- }
-
- @Test
- public void testIdentity() {
- String s = "hello";
- assertSame(s, identity().apply(s));
-
- Integer i = 42;
- assertSame(i, identity().apply(i));
-
- Object o = new Object();
- assertSame(o, identity().apply(o));
- }
-
- @Test
- public void testZero() {
- String s = "hello";
- assertEquals(Integer.valueOf(0), zero().apply(s));
-
- Integer i = 42;
- assertEquals(Integer.valueOf(0), zero().apply(i));
-
- Object o = new Object();
- assertEquals(Integer.valueOf(0), zero().apply(o));
- }
- @Test
- public void testUnpartitioned() {
- String s = "hello";
- assertEquals(Integer.valueOf(0), unpartitioned().apply(s));
-
- Integer i = 42;
- assertEquals(Integer.valueOf(0), unpartitioned().apply(i));
-
- Object o = new Object();
- assertEquals(Integer.valueOf(0), unpartitioned().apply(o));
- }
- @Test
- public void testAlwaysTrue() {
- assertTrue(Functions.alwaysTrue().test("hello"));
- assertTrue(Functions.alwaysTrue().test(42));
- assertTrue(Functions.alwaysTrue().test(new Object()));
- }
- @Test
- public void testAlwaysFalse() {
- assertFalse(Functions.alwaysFalse().test("hello"));
- assertFalse(Functions.alwaysFalse().test(42));
- assertFalse(Functions.alwaysFalse().test(new Object()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/function/src/test/java/org/apache/edgent/test/function/FunctionsTest.java
----------------------------------------------------------------------
diff --git a/api/function/src/test/java/org/apache/edgent/test/function/FunctionsTest.java b/api/function/src/test/java/org/apache/edgent/test/function/FunctionsTest.java
new file mode 100644
index 0000000..903f071
--- /dev/null
+++ b/api/function/src/test/java/org/apache/edgent/test/function/FunctionsTest.java
@@ -0,0 +1,276 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.function;
+
+import static org.apache.edgent.function.Functions.identity;
+import static org.apache.edgent.function.Functions.unpartitioned;
+import static org.apache.edgent.function.Functions.zero;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Functions;
+import org.apache.edgent.function.Supplier;
+import org.junit.Test;
+
+public class FunctionsTest {
+
+ int add;
+
+ @Test
+ public void testSynchronizedFunction() {
+
+ Function<Integer,String> f1 = v -> Integer.toString(v);
+ assertSame(f1, Functions.synchronizedFunction(f1));
+
+ Function<Integer,Integer> f2 = v -> v * 2;
+ assertSame(f2, Functions.synchronizedFunction(f2));
+
+ int a = 7;
+ Function<Integer,Integer> f3 = v -> v + a;
+ assertSame(f3, Functions.synchronizedFunction(f3));
+
+ int[] aa = new int[1];
+ Function<Integer,Integer> f4 = v -> v + aa[0];
+ assertNotSame(f4, Functions.synchronizedFunction(f4));
+
+ aa[0] = 7;
+ int r4 = f4.apply(2);
+ assertEquals(9, r4);
+ aa[0] = 13;
+ r4 = f4.apply(9);
+ assertEquals(22, r4);
+
+ // verify a synchronized function uses the function reference
+ // as the synchronization
+ Function<Integer,Integer> f5 = new F5();
+ add = 99;
+ try {
+ f5.apply(8);
+ fail("Expected IllegalMonitorStateException");
+ } catch (IllegalMonitorStateException e) {
+ // expected
+ }
+ Function<Integer,Integer> f5s = Functions.synchronizedFunction(f5);
+ assertNotSame(f5, f5s);
+ int r5s = f5s.apply(18);
+ assertEquals(117, r5s);
+
+ Function<Integer,Integer> f6 = new F6();
+ assertNotSame(f6, Functions.synchronizedFunction(f6));
+ }
+
+ class F5 implements Function<Integer,Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer apply(Integer value) {
+ notify();
+ return add + value;
+ }
+ }
+ static class F6 implements Function<Integer,Integer>, AutoCloseable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer apply(Integer value) {
+ return value;
+ }
+ @Override
+ public void close() throws Exception {
+ }
+ }
+
+ @Test
+ public void testSynchronizedSupplier() {
+
+ Supplier<Integer> f1 = () -> 3;
+ assertSame(f1, Functions.synchronizedSupplier(f1));
+
+ int a = 7;
+ Supplier<Integer> f2 = () -> a;
+ assertSame(f2, Functions.synchronizedSupplier(f2));
+ int r2 = f2.get();
+ assertEquals(7, r2);
+
+
+ int[] aa = new int[1];
+ Supplier<Integer> f3 = () -> aa[0];
+ assertNotSame(f3, Functions.synchronizedSupplier(f3));
+
+ aa[0] = 7;
+ int r3 = f3.get();
+ assertEquals(7, r3);
+ aa[0] = 13;
+ r3 = f3.get();
+ assertEquals(13, r3);
+
+ // verify a synchronized function uses the function reference
+ // as the synchronization
+ Supplier<Integer> f4 = new S4();
+ add = 127;
+ try {
+ f4.get();
+ fail("Expected IllegalMonitorStateException");
+ } catch (IllegalMonitorStateException e) {
+ // expected
+ }
+ Supplier<Integer> f4s = Functions.synchronizedSupplier(f4);
+ assertNotSame(f4, f4s);
+ int r4s = f4s.get();
+ assertEquals(127, r4s);
+
+ Supplier<Integer> f5 = new S5();
+ assertNotSame(f5, Functions.synchronizedSupplier(f5));
+ }
+ class S4 implements Supplier<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer get() {
+ notify();
+ return add;
+ }
+ }
+ static class S5 implements Supplier<Integer>, AutoCloseable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer get() {
+ return 0;
+ }
+ @Override
+ public void close() throws Exception {
+ }
+ }
+
+ static void consumer(Integer x) {
+ }
+
+ @Test
+ public void testSynchronizedConsumer() {
+
+ Consumer<Integer> f1 = v -> {};
+ assertSame(f1, Functions.synchronizedConsumer(f1));
+
+ int a = 7;
+ Consumer<Integer> f2 = v -> consumer(a + v);
+ assertSame(f2, Functions.synchronizedConsumer(f2));
+
+ int[] aa = new int[1];
+ Consumer<Integer> f3 = v -> aa[0] = v;
+ assertNotSame(f3, Functions.synchronizedConsumer(f3));
+
+ aa[0] = 7;
+ f3.accept(85);
+ assertEquals(85, aa[0]);
+
+ // verify a synchronized function uses the function reference
+ // as the synchronization
+ Consumer<Integer> f4 = new C4();
+ add = 127;
+ try {
+ f4.accept(99);
+ fail("Expected IllegalMonitorStateException");
+ } catch (IllegalMonitorStateException e) {
+ // expected
+ }
+ Consumer<Integer> f4s = Functions.synchronizedConsumer(f4);
+ assertNotSame(f4, f4s);
+ f4s.accept(2421);
+ assertEquals(2421, add);
+
+ Consumer<Integer> f5 = new C5();
+ assertNotSame(f5, Functions.synchronizedConsumer(f5));
+ }
+ class C4 implements Consumer<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void accept(Integer value) {
+ notify();
+ add = value;
+ }
+
+ }
+ static class C5 implements Consumer<Integer>, AutoCloseable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void accept(Integer value) {
+ }
+ @Override
+ public void close() throws Exception {
+ }
+ }
+
+ @Test
+ public void testIdentity() {
+ String s = "hello";
+ assertSame(s, identity().apply(s));
+
+ Integer i = 42;
+ assertSame(i, identity().apply(i));
+
+ Object o = new Object();
+ assertSame(o, identity().apply(o));
+ }
+
+ @Test
+ public void testZero() {
+ String s = "hello";
+ assertEquals(Integer.valueOf(0), zero().apply(s));
+
+ Integer i = 42;
+ assertEquals(Integer.valueOf(0), zero().apply(i));
+
+ Object o = new Object();
+ assertEquals(Integer.valueOf(0), zero().apply(o));
+ }
+ @Test
+ public void testUnpartitioned() {
+ String s = "hello";
+ assertEquals(Integer.valueOf(0), unpartitioned().apply(s));
+
+ Integer i = 42;
+ assertEquals(Integer.valueOf(0), unpartitioned().apply(i));
+
+ Object o = new Object();
+ assertEquals(Integer.valueOf(0), unpartitioned().apply(o));
+ }
+ @Test
+ public void testAlwaysTrue() {
+ assertTrue(Functions.alwaysTrue().test("hello"));
+ assertTrue(Functions.alwaysTrue().test(42));
+ assertTrue(Functions.alwaysTrue().test(new Object()));
+ }
+ @Test
+ public void testAlwaysFalse() {
+ assertFalse(Functions.alwaysFalse().test("hello"));
+ assertFalse(Functions.alwaysFalse().test(42));
+ assertFalse(Functions.alwaysFalse().test(new Object()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/edgent/graph/Connector.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/edgent/graph/Connector.java b/api/graph/src/main/java/edgent/graph/Connector.java
deleted file mode 100644
index 9b54bfa..0000000
--- a/api/graph/src/main/java/edgent/graph/Connector.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.graph;
-
-import java.util.Set;
-
-import edgent.oplet.core.Peek;
-
-/**
- * A {@code Connector} represents an output port of a {@code Vertex}.
- *
- * A {@code Connector} supports two methods to add processing for tuples
- * submitted to the port:
- * <UL>
- * <LI>{@link #connect(Vertex, int)} : Connect this to an input port of another
- * {@code Vertex}. Any number of connections can be made. Any tuple submitted by
- * the output port will appear on all connections made through this method. For
- * any tuple {@code t} ordering of appearance across the connected input ports
- * is not guaranteed.</LI>
- * <LI>{@link #peek(Peek)} : Insert a peek after the output port and before any
- * connections made by {@link #connect(Vertex, int)}. Multiple peeks can be
- * inserted. A tuple {@code t} submitted by the output port will be seen by all
- * peek oplets. The ordering of the peek is guaranteed such that the peeks
- * are processed in the order they were added to this {@code Connector} with the
- * {@code t} being seen first by the first peek added.
- * <LI>
- * </UL>
- * For example with peeks {@code P1,P2,P3} added in that order and connections
- * {@code C1,C2} added, the graph will be logically:
- *
- * <pre>
- * {@code
- * -->C1
- * port-->P1-->P2-->P3--|
- * -->C2
- * }
- * </pre>
- *
- * A tuple {@code t} submitted by the port will be peeked at by {@code P1}, then
- * {@code P2} then {@code P3}. After {@code P3} peeked at the tuple, {@code C1}
- * and {@code C2} will process the tuple in an arbitrary order.
- *
- * @param <T>
- * Type of the data item produced by the output port
- */
-public interface Connector<T> {
-
- /**
- * Gets the {@code Graph} for this {@code Connector}.
- *
- * @return the {@code Graph} for this {@code Connector}.
- */
- Graph graph();
-
- /**
- * Connect this {@code Connector} to the specified target's input. This
- * method may be called multiple times to fan out to multiple input ports.
- * Each tuple submitted to this output port will be processed by all
- * connections.
- *
- * @param target
- * the {@code Vertex} to connect to
- * @param inputPort
- * the index of the target's input port to connect to.
- */
- void connect(Vertex<?, T, ?> target, int inputPort);
-
- /**
- * Was connect() called on this connector?
- *
- * @return true if connected
- */
- boolean isConnected();
-
- /**
- * Inserts a {@code Peek} oplet between an output port and its
- * connections. This method may be called multiple times to insert multiple
- * peeks. Each tuple submitted to this output port will be seen by all peeks
- * in order of their insertion, starting with the first peek inserted.
- *
- * @param <N> Peek oplet type
- * @param oplet
- * Oplet to insert.
- * @return {@code output}
- */
- <N extends Peek<T>> Connector<T> peek(N oplet);
-
- /**
- * Adds the specified tags to the connector. Adding the same tag
- * multiple times will not change the result beyond the initial
- * application. An unconnected connector can be tagged.
- *
- * @param values
- * Tag values.
- */
- void tag(String... values);
-
- /**
- * Returns the set of tags associated with this connector.
- *
- * @return set of tag values.
- */
- Set<String> getTags();
-
- /**
- * Set the alias for the connector.
- * <p>
- * The alias must be unique within the topology.
- * The alias may be used in various contexts:
- * </p>
- * <ul>
- * <li>Runtime control services for the Connector (stream/outputport)
- * are registered with this alias.</li>
- * </ul>
- *
- * @param alias the alias
- * @throws IllegalStateException if the an alias has already been set
- */
- void alias(String alias);
-
- /**
- * Returns the alias for the connector if any.
- * @return the alias. null if one has not be set.
- */
- String getAlias();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/edgent/graph/Edge.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/edgent/graph/Edge.java b/api/graph/src/main/java/edgent/graph/Edge.java
deleted file mode 100644
index 08dc0c7..0000000
--- a/api/graph/src/main/java/edgent/graph/Edge.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.graph;
-
-import java.util.Set;
-
-/**
- * Represents an edge between two Vertices.
- */
-public interface Edge {
-
- /**
- * Returns the source vertex.
- * @return the source vertex.
- */
- Vertex<?, ?, ?> getSource();
-
- /**
- * Returns the source output port index.
- * @return the source output port index.
- */
- int getSourceOutputPort();
-
- /**
- * Returns the target vertex.
- * @return the target vertex.
- */
- Vertex<?, ?, ?> getTarget();
-
- /**
- * Returns the target input port index.
- * @return the target input port index.
- */
- int getTargetInputPort();
-
- /**
- * Returns the set of tags associated with this edge.
- *
- * @return set of tag values.
- */
- Set<String> getTags();
-
- /**
- * Returns the alias associated with this edge.
- *
- * @return the alias. null if none.
- */
- String getAlias();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/edgent/graph/Graph.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/edgent/graph/Graph.java b/api/graph/src/main/java/edgent/graph/Graph.java
deleted file mode 100644
index bb9c008..0000000
--- a/api/graph/src/main/java/edgent/graph/Graph.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.graph;
-
-import java.util.Collection;
-
-import edgent.function.Predicate;
-import edgent.function.Supplier;
-import edgent.oplet.Oplet;
-import edgent.oplet.core.Peek;
-import edgent.oplet.core.Source;
-
-/**
- * A generic directed graph of vertices, connectors and edges.
- * <p>
- * The graph consists of {@link Vertex} objects, each having
- * 0 or more input and/or output {@link Connector} objects.
- * {@link Edge} objects connect an output connector to
- * an input connector.
- * <p>
- * A vertex has an associated {@link Oplet} instance that will be executed
- * at runtime.
- */
-public interface Graph {
-
- /**
- * Add a new unconnected {@code Vertex} into the graph.
- * <p>
- *
- * @param <N> an Oplet type
- * @param <C> tuple type of input streams
- * @param <P> tuple type of output streams
- * @param oplet the oplet to associate with the new vertex
- * @param inputs the number of input connectors for the new vertex
- * @param outputs the number of output connectors for the new vertex
- * @return the newly created {@code Vertex} for the oplet
- */
- <N extends Oplet<C, P>, C, P> Vertex<N, C, P> insert(N oplet, int inputs, int outputs);
-
- /**
- * Create a new unconnected {@link Vertex} associated with the
- * specified source {@link Oplet}.
- * <p>
- * The {@code Vertex} for the oplet has 0 input connectors and one output connector.
- * @param <N> a Source type
- * @param <P> tuple type
- * @param oplet the source oplet
- * @return the output connector for the newly created vertex.
- */
- <N extends Source<P>, P> Connector<P> source(N oplet);
-
- /**
- * Create a new connected {@link Vertex} associated with the
- * specified {@link Oplet}.
- * <p>
- * The new {@code Vertex} has one input and one output {@code Connector}.
- * An {@link Edge} is created connecting the specified output connector to
- * the new vertice's input connector.
- *
- * @param <N> an Oplet type
- * @param <C> tuple type of input streams
- * @param <P> tuple type of output streams
- * @param output the output connector to connect to the vertice's input connector
- * @param oplet the oplet to associate with the new {@code Vertex}
- * @return the output connector for the new {@code Vertex}
- */
- <N extends Oplet<C, P>, C, P> Connector<P> pipe(Connector<C> output, N oplet);
-
- /**
- * Insert Peek oplets returned by the specified {@code Supplier} into
- * the outputs of all of the oplets which satisfy the specified
- * {@code Predicate} and where the output's {@link Connector#isConnected()}
- * is true.
- *
- * @param supplier
- * Function which provides a Peek oplet to insert
- * @param select
- * Vertex selection Predicate
- */
- void peekAll(Supplier<? extends Peek<?>> supplier, Predicate<Vertex<?, ?, ?>> select);
-
- /**
- * Return an unmodifiable view of all vertices in this graph.
- *
- * @return unmodifiable view of all vertices in this graph
- */
- Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> getVertices();
-
- /**
- * Return an unmodifiable view of all edges in this graph.
- * @return the collection
- */
- Collection<Edge> getEdges();
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/edgent/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/edgent/graph/Vertex.java b/api/graph/src/main/java/edgent/graph/Vertex.java
deleted file mode 100644
index 4e05333..0000000
--- a/api/graph/src/main/java/edgent/graph/Vertex.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.graph;
-
-import java.util.List;
-
-import edgent.oplet.Oplet;
-
-/**
- * A {@code Vertex} in a graph.
- * <p>
- * A {@code Vertex} has an {@link Oplet} instance
- * that will be executed at runtime and zero or
- * more input ports and zero or more output ports.
- * Each output port is represented by a {@link Connector} instance.
- *
- * @param <N> the type of the {@code Oplet}
- * @param <C> Data type the oplet consumes in its input ports.
- * @param <P> Data type the oplet produces on its output ports.
- */
-public interface Vertex<N extends Oplet<C, P>, C, P> {
-
- /**
- * Get the vertice's {@link Graph}.
- * @return the graph
- */
- Graph graph();
-
- /**
- * Get the instance of the oplet that will be executed.
- *
- * @return the oplet
- */
- N getInstance();
-
- /**
- * Get the vertice's collection of output connectors.
- * @return an immutable collection of the output connectors.
- */
- List<? extends Connector<P>> getConnectors();
-
- /**
- * Add an output port to the vertex.
- * @return {@code Connector} representing the output port.
- */
- Connector<P> addOutput();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/edgent/graph/package-info.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/edgent/graph/package-info.java b/api/graph/src/main/java/edgent/graph/package-info.java
deleted file mode 100644
index d8c0af8..0000000
--- a/api/graph/src/main/java/edgent/graph/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Low-level graph building API.
- */
-package edgent.graph;
-
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/org/apache/edgent/graph/Connector.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/org/apache/edgent/graph/Connector.java b/api/graph/src/main/java/org/apache/edgent/graph/Connector.java
new file mode 100644
index 0000000..8ffea07
--- /dev/null
+++ b/api/graph/src/main/java/org/apache/edgent/graph/Connector.java
@@ -0,0 +1,143 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.graph;
+
+import java.util.Set;
+
+import org.apache.edgent.oplet.core.Peek;
+
+/**
+ * A {@code Connector} represents an output port of a {@code Vertex}.
+ *
+ * A {@code Connector} supports two methods to add processing for tuples
+ * submitted to the port:
+ * <UL>
+ * <LI>{@link #connect(Vertex, int)} : Connect this to an input port of another
+ * {@code Vertex}. Any number of connections can be made. Any tuple submitted by
+ * the output port will appear on all connections made through this method. For
+ * any tuple {@code t} ordering of appearance across the connected input ports
+ * is not guaranteed.</LI>
+ * <LI>{@link #peek(Peek)} : Insert a peek after the output port and before any
+ * connections made by {@link #connect(Vertex, int)}. Multiple peeks can be
+ * inserted. A tuple {@code t} submitted by the output port will be seen by all
+ * peek oplets. The ordering of the peek is guaranteed such that the peeks
+ * are processed in the order they were added to this {@code Connector} with the
+ * {@code t} being seen first by the first peek added.
+ * <LI>
+ * </UL>
+ * For example with peeks {@code P1,P2,P3} added in that order and connections
+ * {@code C1,C2} added, the graph will be logically:
+ *
+ * <pre>
+ * {@code
+ * -->C1
+ * port-->P1-->P2-->P3--|
+ * -->C2
+ * }
+ * </pre>
+ *
+ * A tuple {@code t} submitted by the port will be peeked at by {@code P1}, then
+ * {@code P2} then {@code P3}. After {@code P3} peeked at the tuple, {@code C1}
+ * and {@code C2} will process the tuple in an arbitrary order.
+ *
+ * @param <T>
+ * Type of the data item produced by the output port
+ */
+public interface Connector<T> {
+
+ /**
+ * Gets the {@code Graph} for this {@code Connector}.
+ *
+ * @return the {@code Graph} for this {@code Connector}.
+ */
+ Graph graph();
+
+ /**
+ * Connect this {@code Connector} to the specified target's input. This
+ * method may be called multiple times to fan out to multiple input ports.
+ * Each tuple submitted to this output port will be processed by all
+ * connections.
+ *
+ * @param target
+ * the {@code Vertex} to connect to
+ * @param inputPort
+ * the index of the target's input port to connect to.
+ */
+ void connect(Vertex<?, T, ?> target, int inputPort);
+
+ /**
+ * Was connect() called on this connector?
+ *
+ * @return true if connected
+ */
+ boolean isConnected();
+
+ /**
+ * Inserts a {@code Peek} oplet between an output port and its
+ * connections. This method may be called multiple times to insert multiple
+ * peeks. Each tuple submitted to this output port will be seen by all peeks
+ * in order of their insertion, starting with the first peek inserted.
+ *
+ * @param <N> Peek oplet type
+ * @param oplet
+ * Oplet to insert.
+ * @return {@code output}
+ */
+ <N extends Peek<T>> Connector<T> peek(N oplet);
+
+ /**
+ * Adds the specified tags to the connector. Adding the same tag
+ * multiple times will not change the result beyond the initial
+ * application. An unconnected connector can be tagged.
+ *
+ * @param values
+ * Tag values.
+ */
+ void tag(String... values);
+
+ /**
+ * Returns the set of tags associated with this connector.
+ *
+ * @return set of tag values.
+ */
+ Set<String> getTags();
+
+ /**
+ * Set the alias for the connector.
+ * <p>
+ * The alias must be unique within the topology.
+ * The alias may be used in various contexts:
+ * </p>
+ * <ul>
+ * <li>Runtime control services for the Connector (stream/outputport)
+ * are registered with this alias.</li>
+ * </ul>
+ *
+ * @param alias the alias
+ * @throws IllegalStateException if the an alias has already been set
+ */
+ void alias(String alias);
+
+ /**
+ * Returns the alias for the connector if any.
+ * @return the alias. null if one has not be set.
+ */
+ String getAlias();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/org/apache/edgent/graph/Edge.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/org/apache/edgent/graph/Edge.java b/api/graph/src/main/java/org/apache/edgent/graph/Edge.java
new file mode 100644
index 0000000..05c8f8c
--- /dev/null
+++ b/api/graph/src/main/java/org/apache/edgent/graph/Edge.java
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.graph;
+
+import java.util.Set;
+
+/**
+ * Represents an edge between two Vertices.
+ */
+public interface Edge {
+
+ /**
+ * Returns the source vertex.
+ * @return the source vertex.
+ */
+ Vertex<?, ?, ?> getSource();
+
+ /**
+ * Returns the source output port index.
+ * @return the source output port index.
+ */
+ int getSourceOutputPort();
+
+ /**
+ * Returns the target vertex.
+ * @return the target vertex.
+ */
+ Vertex<?, ?, ?> getTarget();
+
+ /**
+ * Returns the target input port index.
+ * @return the target input port index.
+ */
+ int getTargetInputPort();
+
+ /**
+ * Returns the set of tags associated with this edge.
+ *
+ * @return set of tag values.
+ */
+ Set<String> getTags();
+
+ /**
+ * Returns the alias associated with this edge.
+ *
+ * @return the alias. null if none.
+ */
+ String getAlias();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/org/apache/edgent/graph/Graph.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/org/apache/edgent/graph/Graph.java b/api/graph/src/main/java/org/apache/edgent/graph/Graph.java
new file mode 100644
index 0000000..54748de
--- /dev/null
+++ b/api/graph/src/main/java/org/apache/edgent/graph/Graph.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.graph;
+
+import java.util.Collection;
+
+import org.apache.edgent.function.Predicate;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.oplet.Oplet;
+import org.apache.edgent.oplet.core.Peek;
+import org.apache.edgent.oplet.core.Source;
+
+/**
+ * A generic directed graph of vertices, connectors and edges.
+ * <p>
+ * The graph consists of {@link Vertex} objects, each having
+ * 0 or more input and/or output {@link Connector} objects.
+ * {@link Edge} objects connect an output connector to
+ * an input connector.
+ * <p>
+ * A vertex has an associated {@link Oplet} instance that will be executed
+ * at runtime.
+ */
+public interface Graph {
+
+ /**
+ * Add a new unconnected {@code Vertex} into the graph.
+ * <p>
+ *
+ * @param <N> an Oplet type
+ * @param <C> tuple type of input streams
+ * @param <P> tuple type of output streams
+ * @param oplet the oplet to associate with the new vertex
+ * @param inputs the number of input connectors for the new vertex
+ * @param outputs the number of output connectors for the new vertex
+ * @return the newly created {@code Vertex} for the oplet
+ */
+ <N extends Oplet<C, P>, C, P> Vertex<N, C, P> insert(N oplet, int inputs, int outputs);
+
+ /**
+ * Create a new unconnected {@link Vertex} associated with the
+ * specified source {@link Oplet}.
+ * <p>
+ * The {@code Vertex} for the oplet has 0 input connectors and one output connector.
+ * @param <N> a Source type
+ * @param <P> tuple type
+ * @param oplet the source oplet
+ * @return the output connector for the newly created vertex.
+ */
+ <N extends Source<P>, P> Connector<P> source(N oplet);
+
+ /**
+ * Create a new connected {@link Vertex} associated with the
+ * specified {@link Oplet}.
+ * <p>
+ * The new {@code Vertex} has one input and one output {@code Connector}.
+ * An {@link Edge} is created connecting the specified output connector to
+ * the new vertice's input connector.
+ *
+ * @param <N> an Oplet type
+ * @param <C> tuple type of input streams
+ * @param <P> tuple type of output streams
+ * @param output the output connector to connect to the vertice's input connector
+ * @param oplet the oplet to associate with the new {@code Vertex}
+ * @return the output connector for the new {@code Vertex}
+ */
+ <N extends Oplet<C, P>, C, P> Connector<P> pipe(Connector<C> output, N oplet);
+
+ /**
+ * Insert Peek oplets returned by the specified {@code Supplier} into
+ * the outputs of all of the oplets which satisfy the specified
+ * {@code Predicate} and where the output's {@link Connector#isConnected()}
+ * is true.
+ *
+ * @param supplier
+ * Function which provides a Peek oplet to insert
+ * @param select
+ * Vertex selection Predicate
+ */
+ void peekAll(Supplier<? extends Peek<?>> supplier, Predicate<Vertex<?, ?, ?>> select);
+
+ /**
+ * Return an unmodifiable view of all vertices in this graph.
+ *
+ * @return unmodifiable view of all vertices in this graph
+ */
+ Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> getVertices();
+
+ /**
+ * Return an unmodifiable view of all edges in this graph.
+ * @return the collection
+ */
+ Collection<Edge> getEdges();
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/org/apache/edgent/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/org/apache/edgent/graph/Vertex.java b/api/graph/src/main/java/org/apache/edgent/graph/Vertex.java
new file mode 100644
index 0000000..98d1b83
--- /dev/null
+++ b/api/graph/src/main/java/org/apache/edgent/graph/Vertex.java
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.graph;
+
+import java.util.List;
+
+import org.apache.edgent.oplet.Oplet;
+
+/**
+ * A {@code Vertex} in a graph.
+ * <p>
+ * A {@code Vertex} has an {@link Oplet} instance
+ * that will be executed at runtime and zero or
+ * more input ports and zero or more output ports.
+ * Each output port is represented by a {@link Connector} instance.
+ *
+ * @param <N> the type of the {@code Oplet}
+ * @param <C> Data type the oplet consumes in its input ports.
+ * @param <P> Data type the oplet produces on its output ports.
+ */
+public interface Vertex<N extends Oplet<C, P>, C, P> {
+
+ /**
+ * Get the vertice's {@link Graph}.
+ * @return the graph
+ */
+ Graph graph();
+
+ /**
+ * Get the instance of the oplet that will be executed.
+ *
+ * @return the oplet
+ */
+ N getInstance();
+
+ /**
+ * Get the vertice's collection of output connectors.
+ * @return an immutable collection of the output connectors.
+ */
+ List<? extends Connector<P>> getConnectors();
+
+ /**
+ * Add an output port to the vertex.
+ * @return {@code Connector} representing the output port.
+ */
+ Connector<P> addOutput();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/org/apache/edgent/graph/package-info.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/org/apache/edgent/graph/package-info.java b/api/graph/src/main/java/org/apache/edgent/graph/package-info.java
new file mode 100644
index 0000000..26dbf63
--- /dev/null
+++ b/api/graph/src/main/java/org/apache/edgent/graph/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Low-level graph building API.
+ */
+package org.apache.edgent.graph;
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/test/java/edgent/test/graph/GraphAbstractTest.java
----------------------------------------------------------------------
diff --git a/api/graph/src/test/java/edgent/test/graph/GraphAbstractTest.java b/api/graph/src/test/java/edgent/test/graph/GraphAbstractTest.java
deleted file mode 100644
index 4bf6c49..0000000
--- a/api/graph/src/test/java/edgent/test/graph/GraphAbstractTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.graph;
-
-import org.junit.Before;
-import org.junit.Ignore;
-
-import edgent.graph.Graph;
-
-@Ignore("Abstract class proiding generic graph testing.")
-public abstract class GraphAbstractTest {
-
- private Graph graph;
-
- @Before
- public void setup() {
- graph = createGraph();
- }
-
- protected Graph getGraph() {
- return graph;
- }
-
- protected abstract Graph createGraph();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/test/java/edgent/test/graph/GraphTest.java
----------------------------------------------------------------------
diff --git a/api/graph/src/test/java/edgent/test/graph/GraphTest.java b/api/graph/src/test/java/edgent/test/graph/GraphTest.java
deleted file mode 100644
index da81a50..0000000
--- a/api/graph/src/test/java/edgent/test/graph/GraphTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.graph;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import edgent.function.Consumer;
-import edgent.graph.Edge;
-import edgent.graph.Graph;
-import edgent.graph.Vertex;
-import edgent.oplet.Oplet;
-import edgent.oplet.core.AbstractOplet;
-
-@Ignore
-public abstract class GraphTest extends GraphAbstractTest {
-
- @Test
- public void testEmptyGraph() {
- assertTrue(getGraph().getVertices().isEmpty());
- }
-
- @Test
- public void testGraphAccess() {
- Graph g = getGraph();
-
- TestOp<String, Integer> op = new TestOp<>();
-
- Vertex<TestOp<String, Integer>, String, Integer> v = g.insert(op, 1, 1);
- assertNotNull(v);
- assertSame(op, v.getInstance());
-
- Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> c = getGraph().getVertices();
- assertNotNull(c);
-
- assertFalse(c.isEmpty());
- assertEquals(1, c.size());
-
- assertSame(v, c.toArray()[0]);
-
- assertTrue(getGraph().getEdges().isEmpty());
-
- try {
- c.clear();
- fail("Was able to modify graph collection");
- } catch (UnsupportedOperationException e) {
- // ok - expected
- }
-
-
- TestOp<Integer, Void> op2 = new TestOp<>();
- Vertex<TestOp<Integer, Void>, Integer, Void> v2 = g.insert(op2, 1, 0);
-
- c = getGraph().getVertices();
- assertNotNull(c);
-
- assertFalse(c.isEmpty());
- assertEquals(2, c.size());
-
- assertSame(v, c.toArray()[0]);
- assertSame(v2, c.toArray()[1]);
-
- assertTrue(getGraph().getEdges().isEmpty());
-
- v.getConnectors().get(0).connect(v2, 0);
-
- Collection<Edge> edges = getGraph().getEdges();
- assertFalse(edges.isEmpty());
- assertEquals(1, edges.size());
-
- Edge vtov2 = (Edge) edges.toArray()[0];
- assertSame(v, vtov2.getSource());
- assertEquals(0, vtov2.getSourceOutputPort());
-
- assertSame(v2, vtov2.getTarget());
- assertEquals(0, vtov2.getTargetInputPort());
- }
-
- private static class TestOp<I, O> extends AbstractOplet<I, O> {
-
- @Override
- public void start() {
- }
-
- @Override
- public List<? extends Consumer<I>> getInputs() {
- return null;
- }
-
- @Override
- public void close() throws Exception {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/test/java/org/apache/edgent/test/graph/GraphAbstractTest.java
----------------------------------------------------------------------
diff --git a/api/graph/src/test/java/org/apache/edgent/test/graph/GraphAbstractTest.java b/api/graph/src/test/java/org/apache/edgent/test/graph/GraphAbstractTest.java
new file mode 100644
index 0000000..379c3b3
--- /dev/null
+++ b/api/graph/src/test/java/org/apache/edgent/test/graph/GraphAbstractTest.java
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.graph;
+
+import org.apache.edgent.graph.Graph;
+import org.junit.Before;
+import org.junit.Ignore;
+
+@Ignore("Abstract class proiding generic graph testing.")
+public abstract class GraphAbstractTest {
+
+ private Graph graph;
+
+ @Before
+ public void setup() {
+ graph = createGraph();
+ }
+
+ protected Graph getGraph() {
+ return graph;
+ }
+
+ protected abstract Graph createGraph();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/test/java/org/apache/edgent/test/graph/GraphTest.java
----------------------------------------------------------------------
diff --git a/api/graph/src/test/java/org/apache/edgent/test/graph/GraphTest.java b/api/graph/src/test/java/org/apache/edgent/test/graph/GraphTest.java
new file mode 100644
index 0000000..0b60008
--- /dev/null
+++ b/api/graph/src/test/java/org/apache/edgent/test/graph/GraphTest.java
@@ -0,0 +1,119 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.graph;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.graph.Edge;
+import org.apache.edgent.graph.Graph;
+import org.apache.edgent.graph.Vertex;
+import org.apache.edgent.oplet.Oplet;
+import org.apache.edgent.oplet.core.AbstractOplet;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public abstract class GraphTest extends GraphAbstractTest {
+
+ @Test
+ public void testEmptyGraph() {
+ assertTrue(getGraph().getVertices().isEmpty());
+ }
+
+ @Test
+ public void testGraphAccess() {
+ Graph g = getGraph();
+
+ TestOp<String, Integer> op = new TestOp<>();
+
+ Vertex<TestOp<String, Integer>, String, Integer> v = g.insert(op, 1, 1);
+ assertNotNull(v);
+ assertSame(op, v.getInstance());
+
+ Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> c = getGraph().getVertices();
+ assertNotNull(c);
+
+ assertFalse(c.isEmpty());
+ assertEquals(1, c.size());
+
+ assertSame(v, c.toArray()[0]);
+
+ assertTrue(getGraph().getEdges().isEmpty());
+
+ try {
+ c.clear();
+ fail("Was able to modify graph collection");
+ } catch (UnsupportedOperationException e) {
+ // ok - expected
+ }
+
+
+ TestOp<Integer, Void> op2 = new TestOp<>();
+ Vertex<TestOp<Integer, Void>, Integer, Void> v2 = g.insert(op2, 1, 0);
+
+ c = getGraph().getVertices();
+ assertNotNull(c);
+
+ assertFalse(c.isEmpty());
+ assertEquals(2, c.size());
+
+ assertSame(v, c.toArray()[0]);
+ assertSame(v2, c.toArray()[1]);
+
+ assertTrue(getGraph().getEdges().isEmpty());
+
+ v.getConnectors().get(0).connect(v2, 0);
+
+ Collection<Edge> edges = getGraph().getEdges();
+ assertFalse(edges.isEmpty());
+ assertEquals(1, edges.size());
+
+ Edge vtov2 = (Edge) edges.toArray()[0];
+ assertSame(v, vtov2.getSource());
+ assertEquals(0, vtov2.getSourceOutputPort());
+
+ assertSame(v2, vtov2.getTarget());
+ assertEquals(0, vtov2.getTargetInputPort());
+ }
+
+ private static class TestOp<I, O> extends AbstractOplet<I, O> {
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public List<? extends Consumer<I>> getInputs() {
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/JobContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/JobContext.java b/api/oplet/src/main/java/edgent/oplet/JobContext.java
deleted file mode 100644
index 6688478..0000000
--- a/api/oplet/src/main/java/edgent/oplet/JobContext.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet;
-
-/**
- * Information about an oplet invocation's job.
- */
-public interface JobContext {
- /**
- * Get the runtime identifier for the job containing this {@link Oplet}.
- * @return The job identifier for the application being executed.
- */
- String getId();
-
- /**
- * Get the name of the job containing this {@link Oplet}.
- * @return The job name for the application being executed.
- */
- String getName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/Oplet.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/Oplet.java b/api/oplet/src/main/java/edgent/oplet/Oplet.java
deleted file mode 100644
index 913a3f4..0000000
--- a/api/oplet/src/main/java/edgent/oplet/Oplet.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet;
-
-import java.util.List;
-
-import edgent.function.Consumer;
-
-/**
- * Generic API for an oplet that processes streaming data on 0-N input ports
- * and produces 0-M output streams on its output ports. An input port may be
- * connected with any number of streams from other oplets. An output port may
- * connected to any number of input ports on other oplets.
- *
- * @param <I>
- * Data container type for input tuples.
- * @param <O>
- * Data container type for output tuples.
- */
-public interface Oplet<I, O> extends AutoCloseable {
-
- /**
- * Initialize the oplet.
- *
- * @param context the OpletContext
- * @throws Exception on failure
- */
- void initialize(OpletContext<I, O> context) throws Exception;
-
- /**
- * Start the oplet. Oplets must not submit any tuples not derived from
- * input tuples until this method is called.
- */
- void start();
-
- /**
- * Get the input stream data handlers for this oplet. The number of handlers
- * must equal the number of configured input ports. Each tuple
- * arriving on an input port will be sent to the stream handler for that
- * input port.
- *
- * @return list of consumers
- */
- List<? extends Consumer<I>> getInputs();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/OpletContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/OpletContext.java b/api/oplet/src/main/java/edgent/oplet/OpletContext.java
deleted file mode 100644
index ab841ea..0000000
--- a/api/oplet/src/main/java/edgent/oplet/OpletContext.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet;
-
-import java.util.List;
-
-import edgent.execution.services.RuntimeServices;
-import edgent.function.Consumer;
-
-/**
- * Context information for the {@code Oplet}'s invocation context.
- * <P>
- * At execution time an oplet uses its invocation context to retrieve
- * provided {@link #getService(Class) services},
- * {@link #getOutputs() output ports} for tuple submission
- * and {@link #getJobContext() job} information.
- *
- * @param <I> tuple type of input streams
- * @param <O> tuple type of output streams
- */
-public interface OpletContext<I, O> extends RuntimeServices {
-
- /**
- * Get the unique identifier (within the running job)
- * for this oplet.
- * @return unique identifier for this oplet
- */
- String getId();
-
- /**
- * {@inheritDoc}
- * <P>
- * Get a service for this oplet invocation.
- *
- * An invocation of an oplet may get access to services,
- * which provide specific functionality, such as metrics.
- * </P>
- *
- */
- @Override
- <T> T getService(Class<T> serviceClass);
-
- /**
- * Get the number of connected inputs to this oplet.
- * @return number of connected inputs to this oplet.
- */
- int getInputCount();
-
- /**
- * Get the number of connected outputs to this oplet.
- * @return number of connected outputs to this oplet.
- */
- int getOutputCount();
-
- /**
- * Get the mechanism to submit tuples on an output port.
- *
- * @return list of consumers
- */
- List<? extends Consumer<O>> getOutputs();
-
- /**
- * Get the oplet's output port context information.
- * @return list of {@link OutputPortContext}, one for each output port.
- */
- List<OutputPortContext> getOutputContext();
-
- /**
- * Get the job hosting this oplet.
- * @return {@link JobContext} hosting this oplet invocation.
- */
- JobContext getJobContext();
-
- /**
- * Creates a unique name within the context of the current runtime.
- * <p>
- * The default implementation adds a suffix composed of the package
- * name of this interface, the current job and oplet identifiers,
- * all separated by periods ({@code '.'}). Developers should use this
- * method to avoid name clashes when they store or register the name in
- * an external container or registry.
- *
- * @param name name (possibly non-unique)
- * @return unique name within the context of the current runtime.
- */
- String uniquify(String name);
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/OutputPortContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/OutputPortContext.java b/api/oplet/src/main/java/edgent/oplet/OutputPortContext.java
deleted file mode 100644
index 957543a..0000000
--- a/api/oplet/src/main/java/edgent/oplet/OutputPortContext.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet;
-
-/**
- * Information about an oplet output port.
- */
-public interface OutputPortContext {
- /**
- * Get the alias of the output port if any.
- * @return the alias; null if none.
- */
- String getAlias();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/AbstractOplet.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/AbstractOplet.java b/api/oplet/src/main/java/edgent/oplet/core/AbstractOplet.java
deleted file mode 100644
index 83dd7ee..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/AbstractOplet.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import edgent.oplet.Oplet;
-import edgent.oplet.OpletContext;
-
-public abstract class AbstractOplet<I, O> implements Oplet<I, O> {
-
- private OpletContext<I, O> context;
-
- @Override
- public void initialize(OpletContext<I, O> context) {
- this.context = context;
- }
-
- public final OpletContext<I, O> getOpletContext() {
- return context;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/FanIn.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/FanIn.java b/api/oplet/src/main/java/edgent/oplet/core/FanIn.java
deleted file mode 100644
index 80ac3d5..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/FanIn.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.oplet.OpletContext;
-
-/**
- * FanIn oplet, merges multiple input ports into a single output port.
- * <P>
- * For each tuple received, {@code receiver.apply(T tuple, Integer index)}
- * is called. {@code index} is the tuple's input stream's index, where
- * {@code this} is index 0 followed by {@code others} in their order.
- * {@code receiver} either returns a tuple to emit on the output
- * stream or null.
- * </P>
- *
- * @param <T> Tuple type of input streams
- * @param <U> Tuple type of output stream
- */
-public class FanIn<T,U> extends AbstractOplet<T, U> {
- private BiFunction<T, Integer, U> receiver;
- private List<Consumer<T>> iportConsumers;
- private Consumer<U> destination;
-
- public FanIn() {
- }
-
- public FanIn(BiFunction<T, Integer, U> receiver) {
- this.receiver = receiver;
- }
-
- @Override
- public void initialize(OpletContext<T, U> context) {
- super.initialize(context);
- destination = context.getOutputs().get(0);
-
- // Create a consumer for each input port.
- int numIports = getOpletContext().getInputCount();
- if (iportConsumers == null) {
- // each iport invokes the receiver
- iportConsumers = new ArrayList<>(numIports);
- for (int i = 0; i < numIports; i++)
- iportConsumers.add(consumer(i));
- iportConsumers = Collections.unmodifiableList(iportConsumers);
- }
- }
-
- /**
- * Set the receiver function. Must be called no later than as part
- * of {@link #initialize(OpletContext)}.
- * @param receiver function to receive tuples
- */
- protected void setReceiver(BiFunction<T, Integer, U> receiver) {
- this.receiver = receiver;
- }
-
- @Override
- public void start() {
- }
-
- @Override
- public List<? extends Consumer<T>> getInputs() {
- return iportConsumers;
- }
-
- /**
- * Create a Consumer for the input port that invokes the
- * receiver and submits a generated tuple, if any, to the output.
- * @param iportIndex index of the input port
- * @return the Consumer
- */
- protected Consumer<T> consumer(int iportIndex) {
- return tuple -> {
- U result = receiver.apply(tuple, iportIndex);
- if (result != null)
- submit(result);
- };
- }
-
- protected Consumer<U> getDestination() {
- return destination;
- }
-
- /**
- * Submit a tuple to single output.
- * @param tuple Tuple to be submitted.
- */
- protected void submit(U tuple) {
- getDestination().accept(tuple);
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/FanOut.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/FanOut.java b/api/oplet/src/main/java/edgent/oplet/core/FanOut.java
deleted file mode 100644
index 8c7dbcf..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/FanOut.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-
-public final class FanOut<T> extends AbstractOplet<T, T> implements Consumer<T> {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- private List<? extends Consumer<T>> targets;
- private int n;
-
- @Override
- public void start() {
- }
-
- @Override
- public List<? extends Consumer<T>> getInputs() {
- targets = getOpletContext().getOutputs();
- n = targets.size();
- return Collections.singletonList(this);
- }
-
- @Override
- public void accept(T tuple) {
- for (int i = 0; i < n; i++)
- targets.get(i).accept(tuple);
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Peek.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Peek.java b/api/oplet/src/main/java/edgent/oplet/core/Peek.java
deleted file mode 100644
index fd744bb..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Peek.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-/**
- * Oplet that allows a peek at each tuple and always forwards a tuple onto
- * its single output port.
- *
- * {@link #peek(Object)} is called before the tuple is forwarded
- * and it is intended that the peek be a low cost operation
- * such as increasing a metric.
- *
- * @param <T>
- * Type of the tuple.
- */
-public abstract class Peek<T> extends Pipe<T, T> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public final void accept(T tuple) {
- peek(tuple);
- submit(tuple);
- }
-
- protected abstract void peek(T tuple);
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/PeriodicSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/PeriodicSource.java b/api/oplet/src/main/java/edgent/oplet/core/PeriodicSource.java
deleted file mode 100644
index c3a4988..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/PeriodicSource.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import edgent.execution.mbeans.PeriodMXBean;
-import edgent.execution.services.ControlService;
-import edgent.oplet.OpletContext;
-import edgent.oplet.OutputPortContext;
-
-public abstract class PeriodicSource<T> extends Source<T> implements Runnable, PeriodMXBean {
-
- // see comment in TStream.TYPE
- private static final String TSTREAM_TYPE = /*TStream.TYPE*/"stream";
-
- private long period;
- private TimeUnit unit;
- private ScheduledFuture<?> future;
-
- protected PeriodicSource(long period, TimeUnit unit) {
- this.period = period;
- this.unit = unit;
- }
-
- @Override
- public void initialize(OpletContext<Void, T> context) {
- super.initialize(context);
- }
-
- @Override
- public synchronized void start() {
- ControlService cs = getOpletContext().getService(ControlService.class);
- // TODO BUG HERE: the control alias needs to be unique across the
- // entire provider instance (multiple topologies) because the ControlService
- // is provider-wide, not topology specific.
- // Scope it with just the jobId. What's going to unregister this control?
- if (cs != null)
- cs.registerControl(TSTREAM_TYPE, getOpletContext().uniquify(getClass().getSimpleName()),
- getAlias(), PeriodMXBean.class, this);
- schedule(false);
- }
-
- private String getAlias() {
- OutputPortContext oc = getOpletContext().getOutputContext().get(0);
- return oc.getAlias();
- }
-
- private synchronized void schedule(boolean delay) {
- future = getOpletContext().getService(ScheduledExecutorService.class).scheduleAtFixedRate(
- getRunnable(), delay ? getPeriod() : 0, getPeriod(), getUnit());
- }
-
- protected Runnable getRunnable() {
- return this;
- }
-
- protected abstract void fetchTuples() throws Exception;
-
- @Override
- public void run() {
- try {
- fetchTuples();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public synchronized long getPeriod() {
- return period;
- }
-
- @Override
- public synchronized TimeUnit getUnit() {
- return unit;
- }
-
- @Override
- public synchronized void setPeriod(long period) {
- setPeriod(period, getUnit());
- }
-
- @Override
- public synchronized void setPeriod(long period, TimeUnit unit) {
- if (period <= 0)
- throw new IllegalArgumentException();
- if (this.period != period || this.unit != unit) {
- future.cancel(false);
- this.period = period;
- this.unit = unit;
- schedule(true);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Pipe.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Pipe.java b/api/oplet/src/main/java/edgent/oplet/core/Pipe.java
deleted file mode 100644
index 04df7a0..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Pipe.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-import edgent.oplet.OpletContext;
-
-/**
- * Pipe oplet with a single input and output.
- *
- * @param <I>
- * Data container type for input tuples.
- * @param <O>
- * Data container type for output tuples.
- */
-public abstract class Pipe<I, O> extends AbstractOplet<I, O>implements Consumer<I> {
- private static final long serialVersionUID = 1L;
-
- private Consumer<O> destination;
-
- @Override
- public void initialize(OpletContext<I, O> context) {
- super.initialize(context);
-
- destination = context.getOutputs().get(0);
- }
-
- @Override
- public void start() {
- }
-
- @Override
- public List<Consumer<I>> getInputs() {
- return Collections.singletonList(this);
- }
-
- protected Consumer<O> getDestination() {
- return destination;
- }
-
- /**
- * Submit a tuple to single output.
- * @param tuple Tuple to be submitted.
- */
- protected void submit(O tuple) {
- getDestination().accept(tuple);
- }
-}