You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:46 UTC

[45/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/function/src/test/java/edgent/test/function/FunctionsTest.java
----------------------------------------------------------------------
diff --git a/api/function/src/test/java/edgent/test/function/FunctionsTest.java b/api/function/src/test/java/edgent/test/function/FunctionsTest.java
deleted file mode 100644
index cfce455..0000000
--- a/api/function/src/test/java/edgent/test/function/FunctionsTest.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.function;
-
-import static edgent.function.Functions.identity;
-import static edgent.function.Functions.unpartitioned;
-import static edgent.function.Functions.zero;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-
-import edgent.function.Consumer;
-import edgent.function.Function;
-import edgent.function.Functions;
-import edgent.function.Supplier;
-
-public class FunctionsTest {
-    
-    int add;
-
-    @Test
-    public void testSynchronizedFunction() {
-        
-        Function<Integer,String> f1 = v -> Integer.toString(v);
-        assertSame(f1, Functions.synchronizedFunction(f1));
-        
-        Function<Integer,Integer> f2 = v -> v * 2;
-        assertSame(f2, Functions.synchronizedFunction(f2));
-        
-        int a = 7;
-        Function<Integer,Integer> f3 = v -> v + a;
-        assertSame(f3, Functions.synchronizedFunction(f3));
-        
-        int[] aa = new int[1];
-        Function<Integer,Integer> f4 = v -> v + aa[0];
-        assertNotSame(f4, Functions.synchronizedFunction(f4));
-        
-        aa[0] = 7;
-        int r4 = f4.apply(2);
-        assertEquals(9, r4);
-        aa[0] = 13;
-        r4 = f4.apply(9);
-        assertEquals(22, r4);
-        
-        // verify a synchronized function uses the function reference
-        // as the synchronization
-        Function<Integer,Integer> f5 = new F5();
-        add = 99;
-        try {
-            f5.apply(8);
-            fail("Expected IllegalMonitorStateException");
-        } catch (IllegalMonitorStateException e) {
-            // expected
-        }
-        Function<Integer,Integer> f5s = Functions.synchronizedFunction(f5);
-        assertNotSame(f5, f5s);
-        int r5s = f5s.apply(18);
-        assertEquals(117, r5s);
-        
-        Function<Integer,Integer> f6 = new F6();
-        assertNotSame(f6, Functions.synchronizedFunction(f6));
-    }
-    
-    class F5 implements Function<Integer,Integer> {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public Integer apply(Integer value) {
-            notify();
-            return add + value;
-        }
-    }
-    static class F6 implements Function<Integer,Integer>, AutoCloseable {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public Integer apply(Integer value) {
-            return value;
-        }
-        @Override
-        public void close() throws Exception {
-        }
-    }
-    
-    @Test
-    public void testSynchronizedSupplier() {
-        
-        Supplier<Integer> f1 = () -> 3;
-        assertSame(f1, Functions.synchronizedSupplier(f1));
-        
-        int a = 7;
-        Supplier<Integer> f2 = () -> a;
-        assertSame(f2, Functions.synchronizedSupplier(f2));
-        int r2 = f2.get();
-        assertEquals(7, r2);
-
-        
-        int[] aa = new int[1];
-        Supplier<Integer> f3 = () -> aa[0];
-        assertNotSame(f3, Functions.synchronizedSupplier(f3));
-        
-        aa[0] = 7;
-        int r3 = f3.get();
-        assertEquals(7, r3);
-        aa[0] = 13;
-        r3 = f3.get();
-        assertEquals(13, r3);
-        
-        // verify a synchronized function uses the function reference
-        // as the synchronization
-        Supplier<Integer> f4 = new S4();
-        add = 127;
-        try {
-            f4.get();
-            fail("Expected IllegalMonitorStateException");
-        } catch (IllegalMonitorStateException e) {
-            // expected
-        }
-        Supplier<Integer> f4s = Functions.synchronizedSupplier(f4);
-        assertNotSame(f4, f4s);
-        int r4s = f4s.get();
-        assertEquals(127, r4s);
-        
-        Supplier<Integer> f5 = new S5();
-        assertNotSame(f5,  Functions.synchronizedSupplier(f5));
-    }
-    class S4 implements Supplier<Integer> {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public Integer get() {
-            notify();
-            return add;
-        }
-    }
-    static class S5 implements Supplier<Integer>, AutoCloseable {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public Integer get() {
-            return 0;
-        }
-        @Override
-        public void close() throws Exception {
-        }
-    }
-    
-    static void consumer(Integer x) {
-    }
-    
-    @Test
-    public void testSynchronizedConsumer() {
-        
-        Consumer<Integer> f1 = v -> {};
-        assertSame(f1, Functions.synchronizedConsumer(f1));
-        
-        int a = 7;
-        Consumer<Integer> f2 = v -> consumer(a + v);
-        assertSame(f2, Functions.synchronizedConsumer(f2));
-        
-        int[] aa = new int[1];
-        Consumer<Integer> f3 = v -> aa[0] = v;
-        assertNotSame(f3, Functions.synchronizedConsumer(f3));
-        
-        aa[0] = 7;
-        f3.accept(85);
-        assertEquals(85, aa[0]);
-        
-        // verify a synchronized function uses the function reference
-        // as the synchronization
-        Consumer<Integer> f4 = new C4();
-        add = 127;
-        try {
-            f4.accept(99);
-            fail("Expected IllegalMonitorStateException");
-        } catch (IllegalMonitorStateException e) {
-            // expected
-        }
-        Consumer<Integer> f4s = Functions.synchronizedConsumer(f4);
-        assertNotSame(f4, f4s);
-        f4s.accept(2421);
-        assertEquals(2421, add);
-        
-        Consumer<Integer> f5 = new C5();
-        assertNotSame(f5,  Functions.synchronizedConsumer(f5));
-    }
-    class C4 implements Consumer<Integer> {
-
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public void accept(Integer value) {
-            notify();
-            add = value;          
-        }
-        
-    }
-    static class C5 implements Consumer<Integer>, AutoCloseable {
-
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public void accept(Integer value) {          
-        }
-        @Override
-        public void close() throws Exception {
-        }        
-    }
-    
-    @Test
-    public void testIdentity() {
-        String s = "hello";
-        assertSame(s, identity().apply(s));
-        
-        Integer i = 42;
-        assertSame(i, identity().apply(i));
-        
-        Object o = new Object();
-        assertSame(o, identity().apply(o));
-    }
-    
-    @Test
-    public void testZero() {
-        String s = "hello";
-        assertEquals(Integer.valueOf(0), zero().apply(s));
-        
-        Integer i = 42;
-        assertEquals(Integer.valueOf(0), zero().apply(i));
-        
-        Object o = new Object();
-        assertEquals(Integer.valueOf(0), zero().apply(o));
-    }
-    @Test
-    public void testUnpartitioned() {
-        String s = "hello";
-        assertEquals(Integer.valueOf(0), unpartitioned().apply(s));
-        
-        Integer i = 42;
-        assertEquals(Integer.valueOf(0), unpartitioned().apply(i));
-        
-        Object o = new Object();
-        assertEquals(Integer.valueOf(0), unpartitioned().apply(o));
-    }
-    @Test
-    public void testAlwaysTrue() {
-        assertTrue(Functions.alwaysTrue().test("hello"));
-        assertTrue(Functions.alwaysTrue().test(42));
-        assertTrue(Functions.alwaysTrue().test(new Object()));
-    }
-    @Test
-    public void testAlwaysFalse() {
-        assertFalse(Functions.alwaysFalse().test("hello"));
-        assertFalse(Functions.alwaysFalse().test(42));
-        assertFalse(Functions.alwaysFalse().test(new Object()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/function/src/test/java/org/apache/edgent/test/function/FunctionsTest.java
----------------------------------------------------------------------
diff --git a/api/function/src/test/java/org/apache/edgent/test/function/FunctionsTest.java b/api/function/src/test/java/org/apache/edgent/test/function/FunctionsTest.java
new file mode 100644
index 0000000..903f071
--- /dev/null
+++ b/api/function/src/test/java/org/apache/edgent/test/function/FunctionsTest.java
@@ -0,0 +1,276 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.function;
+
+import static org.apache.edgent.function.Functions.identity;
+import static org.apache.edgent.function.Functions.unpartitioned;
+import static org.apache.edgent.function.Functions.zero;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Functions;
+import org.apache.edgent.function.Supplier;
+import org.junit.Test;
+
+public class FunctionsTest {
+    
+    int add;
+
+    @Test
+    public void testSynchronizedFunction() {
+        
+        Function<Integer,String> f1 = v -> Integer.toString(v);
+        assertSame(f1, Functions.synchronizedFunction(f1));
+        
+        Function<Integer,Integer> f2 = v -> v * 2;
+        assertSame(f2, Functions.synchronizedFunction(f2));
+        
+        int a = 7;
+        Function<Integer,Integer> f3 = v -> v + a;
+        assertSame(f3, Functions.synchronizedFunction(f3));
+        
+        int[] aa = new int[1];
+        Function<Integer,Integer> f4 = v -> v + aa[0];
+        assertNotSame(f4, Functions.synchronizedFunction(f4));
+        
+        aa[0] = 7;
+        int r4 = f4.apply(2);
+        assertEquals(9, r4);
+        aa[0] = 13;
+        r4 = f4.apply(9);
+        assertEquals(22, r4);
+        
+        // verify a synchronized function uses the function reference
+        // as the synchronization
+        Function<Integer,Integer> f5 = new F5();
+        add = 99;
+        try {
+            f5.apply(8);
+            fail("Expected IllegalMonitorStateException");
+        } catch (IllegalMonitorStateException e) {
+            // expected
+        }
+        Function<Integer,Integer> f5s = Functions.synchronizedFunction(f5);
+        assertNotSame(f5, f5s);
+        int r5s = f5s.apply(18);
+        assertEquals(117, r5s);
+        
+        Function<Integer,Integer> f6 = new F6();
+        assertNotSame(f6, Functions.synchronizedFunction(f6));
+    }
+    
+    class F5 implements Function<Integer,Integer> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Integer apply(Integer value) {
+            notify();
+            return add + value;
+        }
+    }
+    static class F6 implements Function<Integer,Integer>, AutoCloseable {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Integer apply(Integer value) {
+            return value;
+        }
+        @Override
+        public void close() throws Exception {
+        }
+    }
+    
+    @Test
+    public void testSynchronizedSupplier() {
+        
+        Supplier<Integer> f1 = () -> 3;
+        assertSame(f1, Functions.synchronizedSupplier(f1));
+        
+        int a = 7;
+        Supplier<Integer> f2 = () -> a;
+        assertSame(f2, Functions.synchronizedSupplier(f2));
+        int r2 = f2.get();
+        assertEquals(7, r2);
+
+        
+        int[] aa = new int[1];
+        Supplier<Integer> f3 = () -> aa[0];
+        assertNotSame(f3, Functions.synchronizedSupplier(f3));
+        
+        aa[0] = 7;
+        int r3 = f3.get();
+        assertEquals(7, r3);
+        aa[0] = 13;
+        r3 = f3.get();
+        assertEquals(13, r3);
+        
+        // verify a synchronized function uses the function reference
+        // as the synchronization
+        Supplier<Integer> f4 = new S4();
+        add = 127;
+        try {
+            f4.get();
+            fail("Expected IllegalMonitorStateException");
+        } catch (IllegalMonitorStateException e) {
+            // expected
+        }
+        Supplier<Integer> f4s = Functions.synchronizedSupplier(f4);
+        assertNotSame(f4, f4s);
+        int r4s = f4s.get();
+        assertEquals(127, r4s);
+        
+        Supplier<Integer> f5 = new S5();
+        assertNotSame(f5,  Functions.synchronizedSupplier(f5));
+    }
+    class S4 implements Supplier<Integer> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Integer get() {
+            notify();
+            return add;
+        }
+    }
+    static class S5 implements Supplier<Integer>, AutoCloseable {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Integer get() {
+            return 0;
+        }
+        @Override
+        public void close() throws Exception {
+        }
+    }
+    
+    static void consumer(Integer x) {
+    }
+    
+    @Test
+    public void testSynchronizedConsumer() {
+        
+        Consumer<Integer> f1 = v -> {};
+        assertSame(f1, Functions.synchronizedConsumer(f1));
+        
+        int a = 7;
+        Consumer<Integer> f2 = v -> consumer(a + v);
+        assertSame(f2, Functions.synchronizedConsumer(f2));
+        
+        int[] aa = new int[1];
+        Consumer<Integer> f3 = v -> aa[0] = v;
+        assertNotSame(f3, Functions.synchronizedConsumer(f3));
+        
+        aa[0] = 7;
+        f3.accept(85);
+        assertEquals(85, aa[0]);
+        
+        // verify a synchronized function uses the function reference
+        // as the synchronization
+        Consumer<Integer> f4 = new C4();
+        add = 127;
+        try {
+            f4.accept(99);
+            fail("Expected IllegalMonitorStateException");
+        } catch (IllegalMonitorStateException e) {
+            // expected
+        }
+        Consumer<Integer> f4s = Functions.synchronizedConsumer(f4);
+        assertNotSame(f4, f4s);
+        f4s.accept(2421);
+        assertEquals(2421, add);
+        
+        Consumer<Integer> f5 = new C5();
+        assertNotSame(f5,  Functions.synchronizedConsumer(f5));
+    }
+    class C4 implements Consumer<Integer> {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void accept(Integer value) {
+            notify();
+            add = value;          
+        }
+        
+    }
+    static class C5 implements Consumer<Integer>, AutoCloseable {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void accept(Integer value) {          
+        }
+        @Override
+        public void close() throws Exception {
+        }        
+    }
+    
+    @Test
+    public void testIdentity() {
+        String s = "hello";
+        assertSame(s, identity().apply(s));
+        
+        Integer i = 42;
+        assertSame(i, identity().apply(i));
+        
+        Object o = new Object();
+        assertSame(o, identity().apply(o));
+    }
+    
+    @Test
+    public void testZero() {
+        String s = "hello";
+        assertEquals(Integer.valueOf(0), zero().apply(s));
+        
+        Integer i = 42;
+        assertEquals(Integer.valueOf(0), zero().apply(i));
+        
+        Object o = new Object();
+        assertEquals(Integer.valueOf(0), zero().apply(o));
+    }
+    @Test
+    public void testUnpartitioned() {
+        String s = "hello";
+        assertEquals(Integer.valueOf(0), unpartitioned().apply(s));
+        
+        Integer i = 42;
+        assertEquals(Integer.valueOf(0), unpartitioned().apply(i));
+        
+        Object o = new Object();
+        assertEquals(Integer.valueOf(0), unpartitioned().apply(o));
+    }
+    @Test
+    public void testAlwaysTrue() {
+        assertTrue(Functions.alwaysTrue().test("hello"));
+        assertTrue(Functions.alwaysTrue().test(42));
+        assertTrue(Functions.alwaysTrue().test(new Object()));
+    }
+    @Test
+    public void testAlwaysFalse() {
+        assertFalse(Functions.alwaysFalse().test("hello"));
+        assertFalse(Functions.alwaysFalse().test(42));
+        assertFalse(Functions.alwaysFalse().test(new Object()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/edgent/graph/Connector.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/edgent/graph/Connector.java b/api/graph/src/main/java/edgent/graph/Connector.java
deleted file mode 100644
index 9b54bfa..0000000
--- a/api/graph/src/main/java/edgent/graph/Connector.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.graph;
-
-import java.util.Set;
-
-import edgent.oplet.core.Peek;
-
-/**
- * A {@code Connector} represents an output port of a {@code Vertex}.
- * 
- * A {@code Connector} supports two methods to add processing for tuples
- * submitted to the port:
- * <UL>
- * <LI>{@link #connect(Vertex, int)} : Connect this to an input port of another
- * {@code Vertex}. Any number of connections can be made. Any tuple submitted by
- * the output port will appear on all connections made through this method. For
- * any tuple {@code t} ordering of appearance across the connected input ports
- * is not guaranteed.</LI>
- * <LI>{@link #peek(Peek)} : Insert a peek after the output port and before any
- * connections made by {@link #connect(Vertex, int)}. Multiple peeks can be
- * inserted. A tuple {@code t} submitted by the output port will be seen by all
- * peek oplets. The ordering of the peek is guaranteed such that the peeks
- * are processed in the order they were added to this {@code Connector} with the
- * {@code t} being seen first by the first peek added.
- * <LI>
- * </UL>
- * For example with peeks {@code P1,P2,P3} added in that order and connections
- * {@code C1,C2} added, the graph will be logically:
- *
- * <pre>
- * {@code
- *                      -->C1
- * port-->P1-->P2-->P3--|
- *                      -->C2
- * }
- * </pre>
- * 
- * A tuple {@code t} submitted by the port will be peeked at by {@code P1}, then
- * {@code P2} then {@code P3}. After {@code P3} peeked at the tuple, {@code C1}
- * and {@code C2} will process the tuple in an arbitrary order.
- * 
- * @param <T>
- *            Type of the data item produced by the output port
- */
-public interface Connector<T> {
-	
-	/**
-	 * Gets the {@code Graph} for this {@code Connector}.
-	 * 
-	 * @return the {@code Graph} for this {@code Connector}.
-	 */
-    Graph graph();
-
-    /**
-     * Connect this {@code Connector} to the specified target's input. This
-     * method may be called multiple times to fan out to multiple input ports.
-     * Each tuple submitted to this output port will be processed by all
-     * connections.
-     * 
-     * @param target
-     *            the {@code Vertex} to connect to
-     * @param inputPort
-     *            the index of the target's input port to connect to.
-     */
-    void connect(Vertex<?, T, ?> target, int inputPort);
-    
-	/**
-	 * Was connect() called on this connector?
-	 * 
-	 * @return true if connected
-	 */
-    boolean isConnected();
-
-	/**
-     * Inserts a {@code Peek} oplet between an output port and its
-     * connections. This method may be called multiple times to insert multiple
-     * peeks. Each tuple submitted to this output port will be seen by all peeks
-     * in order of their insertion, starting with the first peek inserted.
-     *
-     * @param <N> Peek oplet type
-     * @param oplet
-     *            Oplet to insert.
-     * @return {@code output}
-     */
-	<N extends Peek<T>> Connector<T> peek(N oplet);
-
-    /**
-     * Adds the specified tags to the connector.  Adding the same tag 
-     * multiple times will not change the result beyond the initial 
-     * application. An unconnected connector can be tagged.
-     * 
-     * @param values
-     *            Tag values.
-     */
-    void tag(String... values);
-
-    /**
-     * Returns the set of tags associated with this connector.
-     * 
-     * @return set of tag values.
-     */
-    Set<String> getTags();
-    
-    /**
-     * Set the alias for the connector.
-     * <p>
-     * The alias must be unique within the topology.
-     * The alias may be used in various contexts:
-     * </p>
-     * <ul>
-     * <li>Runtime control services for the Connector (stream/outputport)
-     * are registered with this alias.</li>
-     * </ul>
-     * 
-     * @param alias the alias
-     * @throws IllegalStateException if the an alias has already been set
-     */
-    void alias(String alias);
-    
-    /**
-     * Returns the alias for the connector if any.
-     * @return the alias. null if one has not be set.
-     */
-    String getAlias();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/edgent/graph/Edge.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/edgent/graph/Edge.java b/api/graph/src/main/java/edgent/graph/Edge.java
deleted file mode 100644
index 08dc0c7..0000000
--- a/api/graph/src/main/java/edgent/graph/Edge.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.graph;
-
-import java.util.Set;
-
-/**
- * Represents an edge between two Vertices.
- */
-public interface Edge {
-    
-    /**
-     * Returns the source vertex.
-     * @return the source vertex.
-     */
-	Vertex<?, ?, ?> getSource();
-
-    /**
-     * Returns the source output port index.
-     * @return the source output port index.
-     */
-	int getSourceOutputPort();
-	
-    /**
-     * Returns the target vertex.
-     * @return the target vertex.
-     */
-	Vertex<?, ?, ?> getTarget();
-
-    /**
-     * Returns the target input port index.
-     * @return the target input port index.
-     */
-	int getTargetInputPort();
-	
-    /**
-     * Returns the set of tags associated with this edge.
-     * 
-     * @return set of tag values.
-     */
-    Set<String> getTags(); 
-    
-    /**
-     * Returns the alias associated with this edge.
-     * 
-     * @return the alias. null if none.
-     */
-    String getAlias(); 
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/edgent/graph/Graph.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/edgent/graph/Graph.java b/api/graph/src/main/java/edgent/graph/Graph.java
deleted file mode 100644
index bb9c008..0000000
--- a/api/graph/src/main/java/edgent/graph/Graph.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.graph;
-
-import java.util.Collection;
-
-import edgent.function.Predicate;
-import edgent.function.Supplier;
-import edgent.oplet.Oplet;
-import edgent.oplet.core.Peek;
-import edgent.oplet.core.Source;
-
-/**
- * A generic directed graph of vertices, connectors and edges.
- * <p>
- * The graph consists of {@link Vertex} objects, each having
- * 0 or more input and/or output {@link Connector} objects.
- * {@link Edge} objects connect an output connector to
- * an input connector.
- * <p>
- * A vertex has an associated {@link Oplet} instance that will be executed
- * at runtime.
- */
-public interface Graph {
-
-    /**
-     * Add a new unconnected {@code Vertex} into the graph.
-     * <p>
-     * 
-     * @param <N> an Oplet type
-     * @param <C> tuple type of input streams
-     * @param <P> tuple type of output streams
-     * @param oplet the oplet to associate with the new vertex
-     * @param inputs the number of input connectors for the new vertex
-     * @param outputs the number of output connectors for the new vertex
-     * @return the newly created {@code Vertex} for the oplet
-     */
-    <N extends Oplet<C, P>, C, P> Vertex<N, C, P> insert(N oplet, int inputs, int outputs);
-
-    /**
-     * Create a new unconnected {@link Vertex} associated with the
-     * specified source {@link Oplet}.
-     * <p>
-     * The {@code Vertex} for the oplet has 0 input connectors and one output connector.
-     * @param <N> a Source type
-     * @param <P> tuple type
-     * @param oplet the source oplet
-     * @return the output connector for the newly created vertex.
-     */
-    <N extends Source<P>, P> Connector<P> source(N oplet);
-
-    /**
-     * Create a new connected {@link Vertex} associated with the
-     * specified {@link Oplet}.
-     * <p>
-     * The new {@code Vertex} has one input and one output {@code Connector}.
-     * An {@link Edge} is created connecting the specified output connector to
-     * the new vertice's input connector.
-     * 
-     * @param <N> an Oplet type
-     * @param <C> tuple type of input streams
-     * @param <P> tuple type of output streams
-     * @param output the output connector to connect to the vertice's input connector
-     * @param oplet the oplet to associate with the new {@code Vertex}
-     * @return the output connector for the new {@code Vertex}
-     */
-	<N extends Oplet<C, P>, C, P> Connector<P> pipe(Connector<C> output, N oplet);
-
-    /**
-     * Insert Peek oplets returned by the specified {@code Supplier} into 
-     * the outputs of all of the oplets which satisfy the specified 
-     * {@code Predicate} and where the output's {@link Connector#isConnected()}
-     * is true.
-     * 
-     * @param supplier
-     *            Function which provides a Peek oplet to insert
-     * @param select
-     *            Vertex selection Predicate
-     */
-    void peekAll(Supplier<? extends Peek<?>> supplier, Predicate<Vertex<?, ?, ?>> select);
-
-    /**
-     * Return an unmodifiable view of all vertices in this graph.
-     * 
-     * @return unmodifiable view of all vertices in this graph
-     */
-    Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> getVertices();
-    
-    /**
-     * Return an unmodifiable view of all edges in this graph.
-     * @return the collection
-     */
-    Collection<Edge> getEdges();
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/edgent/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/edgent/graph/Vertex.java b/api/graph/src/main/java/edgent/graph/Vertex.java
deleted file mode 100644
index 4e05333..0000000
--- a/api/graph/src/main/java/edgent/graph/Vertex.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.graph;
-
-import java.util.List;
-
-import edgent.oplet.Oplet;
-
-/**
- * A {@code Vertex} in a graph.
- * <p>
- * A {@code Vertex} has an {@link Oplet} instance
- * that will be executed at runtime and zero or
- * more input ports and zero or more output ports.
- * Each output port is represented by a {@link Connector} instance.
- * 
- * @param <N> the type of the {@code Oplet}
- * @param <C> Data type the oplet consumes in its input ports.
- * @param <P> Data type the oplet produces on its output ports.
- */
-public interface Vertex<N extends Oplet<C, P>, C, P> {
-
-	/**
-	 * Get the vertice's {@link Graph}.
-	 * @return the graph
-	 */
-    Graph graph();
-
-    /**
-     * Get the instance of the oplet that will be executed.
-     * 
-     * @return the oplet
-     */
-    N getInstance();
-
-    /**
-     * Get the vertice's collection of output connectors.
-     * @return an immutable collection of the output connectors.
-     */
-    List<? extends Connector<P>> getConnectors();
-
-    /**
-     * Add an output port to the vertex.
-     * @return {@code Connector} representing the output port.
-     */
-    Connector<P> addOutput();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/edgent/graph/package-info.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/edgent/graph/package-info.java b/api/graph/src/main/java/edgent/graph/package-info.java
deleted file mode 100644
index d8c0af8..0000000
--- a/api/graph/src/main/java/edgent/graph/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Low-level graph building API.
- */
-package edgent.graph;
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/org/apache/edgent/graph/Connector.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/org/apache/edgent/graph/Connector.java b/api/graph/src/main/java/org/apache/edgent/graph/Connector.java
new file mode 100644
index 0000000..8ffea07
--- /dev/null
+++ b/api/graph/src/main/java/org/apache/edgent/graph/Connector.java
@@ -0,0 +1,143 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.graph;
+
+import java.util.Set;
+
+import org.apache.edgent.oplet.core.Peek;
+
+/**
+ * A {@code Connector} represents an output port of a {@code Vertex}.
+ * 
+ * A {@code Connector} supports two methods to add processing for tuples
+ * submitted to the port:
+ * <UL>
+ * <LI>{@link #connect(Vertex, int)} : Connect this to an input port of another
+ * {@code Vertex}. Any number of connections can be made. Any tuple submitted by
+ * the output port will appear on all connections made through this method. For
+ * any tuple {@code t} ordering of appearance across the connected input ports
+ * is not guaranteed.</LI>
+ * <LI>{@link #peek(Peek)} : Insert a peek after the output port and before any
+ * connections made by {@link #connect(Vertex, int)}. Multiple peeks can be
+ * inserted. A tuple {@code t} submitted by the output port will be seen by all
+ * peek oplets. The ordering of the peek is guaranteed such that the peeks
+ * are processed in the order they were added to this {@code Connector} with the
+ * {@code t} being seen first by the first peek added.
+ * <LI>
+ * </UL>
+ * For example with peeks {@code P1,P2,P3} added in that order and connections
+ * {@code C1,C2} added, the graph will be logically:
+ *
+ * <pre>
+ * {@code
+ *                      -->C1
+ * port-->P1-->P2-->P3--|
+ *                      -->C2
+ * }
+ * </pre>
+ * 
+ * A tuple {@code t} submitted by the port will be peeked at by {@code P1}, then
+ * {@code P2} then {@code P3}. After {@code P3} peeked at the tuple, {@code C1}
+ * and {@code C2} will process the tuple in an arbitrary order.
+ * 
+ * @param <T>
+ *            Type of the data item produced by the output port
+ */
+public interface Connector<T> {
+	
+	/**
+	 * Gets the {@code Graph} for this {@code Connector}.
+	 * 
+	 * @return the {@code Graph} for this {@code Connector}.
+	 */
+    Graph graph();
+
+    /**
+     * Connect this {@code Connector} to the specified target's input. This
+     * method may be called multiple times to fan out to multiple input ports.
+     * Each tuple submitted to this output port will be processed by all
+     * connections.
+     * 
+     * @param target
+     *            the {@code Vertex} to connect to
+     * @param inputPort
+     *            the index of the target's input port to connect to.
+     */
+    void connect(Vertex<?, T, ?> target, int inputPort);
+    
+	/**
+	 * Was connect() called on this connector?
+	 * 
+	 * @return true if connected
+	 */
+    boolean isConnected();
+
+	/**
+     * Inserts a {@code Peek} oplet between an output port and its
+     * connections. This method may be called multiple times to insert multiple
+     * peeks. Each tuple submitted to this output port will be seen by all peeks
+     * in order of their insertion, starting with the first peek inserted.
+     *
+     * @param <N> Peek oplet type
+     * @param oplet
+     *            Oplet to insert.
+     * @return {@code output}
+     */
+	<N extends Peek<T>> Connector<T> peek(N oplet);
+
+    /**
+     * Adds the specified tags to the connector.  Adding the same tag 
+     * multiple times will not change the result beyond the initial 
+     * application. An unconnected connector can be tagged.
+     * 
+     * @param values
+     *            Tag values.
+     */
+    void tag(String... values);
+
+    /**
+     * Returns the set of tags associated with this connector.
+     * 
+     * @return set of tag values.
+     */
+    Set<String> getTags();
+    
+    /**
+     * Set the alias for the connector.
+     * <p>
+     * The alias must be unique within the topology.
+     * The alias may be used in various contexts:
+     * </p>
+     * <ul>
+     * <li>Runtime control services for the Connector (stream/outputport)
+     * are registered with this alias.</li>
+     * </ul>
+     * 
+     * @param alias the alias
+     * @throws IllegalStateException if the an alias has already been set
+     */
+    void alias(String alias);
+    
+    /**
+     * Returns the alias for the connector if any.
+     * @return the alias. null if one has not be set.
+     */
+    String getAlias();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/org/apache/edgent/graph/Edge.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/org/apache/edgent/graph/Edge.java b/api/graph/src/main/java/org/apache/edgent/graph/Edge.java
new file mode 100644
index 0000000..05c8f8c
--- /dev/null
+++ b/api/graph/src/main/java/org/apache/edgent/graph/Edge.java
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.graph;
+
+import java.util.Set;
+
+/**
+ * Represents an edge between two Vertices.
+ */
+public interface Edge {
+    
+    /**
+     * Returns the source vertex.
+     * @return the source vertex.
+     */
+	Vertex<?, ?, ?> getSource();
+
+    /**
+     * Returns the source output port index.
+     * @return the source output port index.
+     */
+	int getSourceOutputPort();
+	
+    /**
+     * Returns the target vertex.
+     * @return the target vertex.
+     */
+	Vertex<?, ?, ?> getTarget();
+
+    /**
+     * Returns the target input port index.
+     * @return the target input port index.
+     */
+	int getTargetInputPort();
+	
+    /**
+     * Returns the set of tags associated with this edge.
+     * 
+     * @return set of tag values.
+     */
+    Set<String> getTags(); 
+    
+    /**
+     * Returns the alias associated with this edge.
+     * 
+     * @return the alias. null if none.
+     */
+    String getAlias(); 
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/org/apache/edgent/graph/Graph.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/org/apache/edgent/graph/Graph.java b/api/graph/src/main/java/org/apache/edgent/graph/Graph.java
new file mode 100644
index 0000000..54748de
--- /dev/null
+++ b/api/graph/src/main/java/org/apache/edgent/graph/Graph.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.graph;
+
+import java.util.Collection;
+
+import org.apache.edgent.function.Predicate;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.oplet.Oplet;
+import org.apache.edgent.oplet.core.Peek;
+import org.apache.edgent.oplet.core.Source;
+
+/**
+ * A generic directed graph of vertices, connectors and edges.
+ * <p>
+ * The graph consists of {@link Vertex} objects, each having
+ * 0 or more input and/or output {@link Connector} objects.
+ * {@link Edge} objects connect an output connector to
+ * an input connector.
+ * <p>
+ * A vertex has an associated {@link Oplet} instance that will be executed
+ * at runtime.
+ */
+public interface Graph {
+
+    /**
+     * Add a new unconnected {@code Vertex} into the graph.
+     * <p>
+     * 
+     * @param <N> an Oplet type
+     * @param <C> tuple type of input streams
+     * @param <P> tuple type of output streams
+     * @param oplet the oplet to associate with the new vertex
+     * @param inputs the number of input connectors for the new vertex
+     * @param outputs the number of output connectors for the new vertex
+     * @return the newly created {@code Vertex} for the oplet
+     */
+    <N extends Oplet<C, P>, C, P> Vertex<N, C, P> insert(N oplet, int inputs, int outputs);
+
+    /**
+     * Create a new unconnected {@link Vertex} associated with the
+     * specified source {@link Oplet}.
+     * <p>
+     * The {@code Vertex} for the oplet has 0 input connectors and one output connector.
+     * @param <N> a Source type
+     * @param <P> tuple type
+     * @param oplet the source oplet
+     * @return the output connector for the newly created vertex.
+     */
+    <N extends Source<P>, P> Connector<P> source(N oplet);
+
+    /**
+     * Create a new connected {@link Vertex} associated with the
+     * specified {@link Oplet}.
+     * <p>
+     * The new {@code Vertex} has one input and one output {@code Connector}.
+     * An {@link Edge} is created connecting the specified output connector to
+     * the new vertice's input connector.
+     * 
+     * @param <N> an Oplet type
+     * @param <C> tuple type of input streams
+     * @param <P> tuple type of output streams
+     * @param output the output connector to connect to the vertice's input connector
+     * @param oplet the oplet to associate with the new {@code Vertex}
+     * @return the output connector for the new {@code Vertex}
+     */
+	<N extends Oplet<C, P>, C, P> Connector<P> pipe(Connector<C> output, N oplet);
+
+    /**
+     * Insert Peek oplets returned by the specified {@code Supplier} into 
+     * the outputs of all of the oplets which satisfy the specified 
+     * {@code Predicate} and where the output's {@link Connector#isConnected()}
+     * is true.
+     * 
+     * @param supplier
+     *            Function which provides a Peek oplet to insert
+     * @param select
+     *            Vertex selection Predicate
+     */
+    void peekAll(Supplier<? extends Peek<?>> supplier, Predicate<Vertex<?, ?, ?>> select);
+
+    /**
+     * Return an unmodifiable view of all vertices in this graph.
+     * 
+     * @return unmodifiable view of all vertices in this graph
+     */
+    Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> getVertices();
+    
+    /**
+     * Return an unmodifiable view of all edges in this graph.
+     * @return the collection
+     */
+    Collection<Edge> getEdges();
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/org/apache/edgent/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/org/apache/edgent/graph/Vertex.java b/api/graph/src/main/java/org/apache/edgent/graph/Vertex.java
new file mode 100644
index 0000000..98d1b83
--- /dev/null
+++ b/api/graph/src/main/java/org/apache/edgent/graph/Vertex.java
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.graph;
+
+import java.util.List;
+
+import org.apache.edgent.oplet.Oplet;
+
+/**
+ * A {@code Vertex} in a graph.
+ * <p>
+ * A {@code Vertex} has an {@link Oplet} instance
+ * that will be executed at runtime and zero or
+ * more input ports and zero or more output ports.
+ * Each output port is represented by a {@link Connector} instance.
+ * 
+ * @param <N> the type of the {@code Oplet}
+ * @param <C> Data type the oplet consumes in its input ports.
+ * @param <P> Data type the oplet produces on its output ports.
+ */
+public interface Vertex<N extends Oplet<C, P>, C, P> {
+
+	/**
+	 * Get the vertice's {@link Graph}.
+	 * @return the graph
+	 */
+    Graph graph();
+
+    /**
+     * Get the instance of the oplet that will be executed.
+     * 
+     * @return the oplet
+     */
+    N getInstance();
+
+    /**
+     * Get the vertice's collection of output connectors.
+     * @return an immutable collection of the output connectors.
+     */
+    List<? extends Connector<P>> getConnectors();
+
+    /**
+     * Add an output port to the vertex.
+     * @return {@code Connector} representing the output port.
+     */
+    Connector<P> addOutput();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/main/java/org/apache/edgent/graph/package-info.java
----------------------------------------------------------------------
diff --git a/api/graph/src/main/java/org/apache/edgent/graph/package-info.java b/api/graph/src/main/java/org/apache/edgent/graph/package-info.java
new file mode 100644
index 0000000..26dbf63
--- /dev/null
+++ b/api/graph/src/main/java/org/apache/edgent/graph/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Low-level graph building API.
+ */
+package org.apache.edgent.graph;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/test/java/edgent/test/graph/GraphAbstractTest.java
----------------------------------------------------------------------
diff --git a/api/graph/src/test/java/edgent/test/graph/GraphAbstractTest.java b/api/graph/src/test/java/edgent/test/graph/GraphAbstractTest.java
deleted file mode 100644
index 4bf6c49..0000000
--- a/api/graph/src/test/java/edgent/test/graph/GraphAbstractTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.graph;
-
-import org.junit.Before;
-import org.junit.Ignore;
-
-import edgent.graph.Graph;
-
-@Ignore("Abstract class proiding generic graph testing.")
-public abstract class GraphAbstractTest {
-
-    private Graph graph;
-
-    @Before
-    public void setup() {
-        graph = createGraph();
-    }
-
-    protected Graph getGraph() {
-        return graph;
-    }
-
-    protected abstract Graph createGraph();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/test/java/edgent/test/graph/GraphTest.java
----------------------------------------------------------------------
diff --git a/api/graph/src/test/java/edgent/test/graph/GraphTest.java b/api/graph/src/test/java/edgent/test/graph/GraphTest.java
deleted file mode 100644
index da81a50..0000000
--- a/api/graph/src/test/java/edgent/test/graph/GraphTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.graph;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import edgent.function.Consumer;
-import edgent.graph.Edge;
-import edgent.graph.Graph;
-import edgent.graph.Vertex;
-import edgent.oplet.Oplet;
-import edgent.oplet.core.AbstractOplet;
-
-@Ignore
-public abstract class GraphTest extends GraphAbstractTest {
-
-    @Test
-    public void testEmptyGraph() {
-        assertTrue(getGraph().getVertices().isEmpty());
-    }
-
-    @Test
-    public void testGraphAccess() {
-        Graph g = getGraph();
-        
-        TestOp<String, Integer> op = new TestOp<>();
-
-        Vertex<TestOp<String, Integer>, String, Integer> v = g.insert(op, 1, 1);
-        assertNotNull(v);
-        assertSame(op, v.getInstance());
-
-        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> c = getGraph().getVertices();
-        assertNotNull(c);
-
-        assertFalse(c.isEmpty());
-        assertEquals(1, c.size());
-
-        assertSame(v, c.toArray()[0]);
-        
-        assertTrue(getGraph().getEdges().isEmpty());
-
-        try {
-            c.clear();
-            fail("Was able to modify graph collection");
-        } catch (UnsupportedOperationException e) {
-            // ok - expected
-        }
-        
-        
-        TestOp<Integer, Void> op2 = new TestOp<>();
-        Vertex<TestOp<Integer, Void>, Integer, Void> v2 = g.insert(op2, 1, 0);
-        
-        c = getGraph().getVertices();
-        assertNotNull(c);
-
-        assertFalse(c.isEmpty());
-        assertEquals(2, c.size());
-
-        assertSame(v, c.toArray()[0]);
-        assertSame(v2, c.toArray()[1]);
-        
-        assertTrue(getGraph().getEdges().isEmpty());
-        
-        v.getConnectors().get(0).connect(v2, 0);
-        
-        Collection<Edge> edges = getGraph().getEdges();
-        assertFalse(edges.isEmpty());
-        assertEquals(1, edges.size());
-        
-        Edge vtov2 = (Edge) edges.toArray()[0];
-        assertSame(v, vtov2.getSource());
-        assertEquals(0, vtov2.getSourceOutputPort());
-        
-        assertSame(v2, vtov2.getTarget());
-        assertEquals(0, vtov2.getTargetInputPort());
-    }
-
-    private static class TestOp<I, O> extends AbstractOplet<I, O> {
-
-        @Override
-        public void start() {
-        }
-
-        @Override
-        public List<? extends Consumer<I>> getInputs() {
-            return null;
-        }
-
-        @Override
-        public void close() throws Exception {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/test/java/org/apache/edgent/test/graph/GraphAbstractTest.java
----------------------------------------------------------------------
diff --git a/api/graph/src/test/java/org/apache/edgent/test/graph/GraphAbstractTest.java b/api/graph/src/test/java/org/apache/edgent/test/graph/GraphAbstractTest.java
new file mode 100644
index 0000000..379c3b3
--- /dev/null
+++ b/api/graph/src/test/java/org/apache/edgent/test/graph/GraphAbstractTest.java
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.graph;
+
+import org.apache.edgent.graph.Graph;
+import org.junit.Before;
+import org.junit.Ignore;
+
+@Ignore("Abstract class proiding generic graph testing.")
+public abstract class GraphAbstractTest {
+
+    private Graph graph;
+
+    @Before
+    public void setup() {
+        graph = createGraph();
+    }
+
+    protected Graph getGraph() {
+        return graph;
+    }
+
+    protected abstract Graph createGraph();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/graph/src/test/java/org/apache/edgent/test/graph/GraphTest.java
----------------------------------------------------------------------
diff --git a/api/graph/src/test/java/org/apache/edgent/test/graph/GraphTest.java b/api/graph/src/test/java/org/apache/edgent/test/graph/GraphTest.java
new file mode 100644
index 0000000..0b60008
--- /dev/null
+++ b/api/graph/src/test/java/org/apache/edgent/test/graph/GraphTest.java
@@ -0,0 +1,119 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.graph;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.graph.Edge;
+import org.apache.edgent.graph.Graph;
+import org.apache.edgent.graph.Vertex;
+import org.apache.edgent.oplet.Oplet;
+import org.apache.edgent.oplet.core.AbstractOplet;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public abstract class GraphTest extends GraphAbstractTest {
+
+    @Test
+    public void testEmptyGraph() {
+        assertTrue(getGraph().getVertices().isEmpty());
+    }
+
+    @Test
+    public void testGraphAccess() {
+        Graph g = getGraph();
+        
+        TestOp<String, Integer> op = new TestOp<>();
+
+        Vertex<TestOp<String, Integer>, String, Integer> v = g.insert(op, 1, 1);
+        assertNotNull(v);
+        assertSame(op, v.getInstance());
+
+        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> c = getGraph().getVertices();
+        assertNotNull(c);
+
+        assertFalse(c.isEmpty());
+        assertEquals(1, c.size());
+
+        assertSame(v, c.toArray()[0]);
+        
+        assertTrue(getGraph().getEdges().isEmpty());
+
+        try {
+            c.clear();
+            fail("Was able to modify graph collection");
+        } catch (UnsupportedOperationException e) {
+            // ok - expected
+        }
+        
+        
+        TestOp<Integer, Void> op2 = new TestOp<>();
+        Vertex<TestOp<Integer, Void>, Integer, Void> v2 = g.insert(op2, 1, 0);
+        
+        c = getGraph().getVertices();
+        assertNotNull(c);
+
+        assertFalse(c.isEmpty());
+        assertEquals(2, c.size());
+
+        assertSame(v, c.toArray()[0]);
+        assertSame(v2, c.toArray()[1]);
+        
+        assertTrue(getGraph().getEdges().isEmpty());
+        
+        v.getConnectors().get(0).connect(v2, 0);
+        
+        Collection<Edge> edges = getGraph().getEdges();
+        assertFalse(edges.isEmpty());
+        assertEquals(1, edges.size());
+        
+        Edge vtov2 = (Edge) edges.toArray()[0];
+        assertSame(v, vtov2.getSource());
+        assertEquals(0, vtov2.getSourceOutputPort());
+        
+        assertSame(v2, vtov2.getTarget());
+        assertEquals(0, vtov2.getTargetInputPort());
+    }
+
+    private static class TestOp<I, O> extends AbstractOplet<I, O> {
+
+        @Override
+        public void start() {
+        }
+
+        @Override
+        public List<? extends Consumer<I>> getInputs() {
+            return null;
+        }
+
+        @Override
+        public void close() throws Exception {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/JobContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/JobContext.java b/api/oplet/src/main/java/edgent/oplet/JobContext.java
deleted file mode 100644
index 6688478..0000000
--- a/api/oplet/src/main/java/edgent/oplet/JobContext.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet;
-
-/**
- * Information about an oplet invocation's job. 
- */
-public interface JobContext {
-    /**
-     * Get the runtime identifier for the job containing this {@link Oplet}.
-     * @return The job identifier for the application being executed.
-     */
-    String getId();
-    
-    /**
-     * Get the name of the job containing this {@link Oplet}.
-     * @return The job name for the application being executed.
-     */
-    String getName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/Oplet.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/Oplet.java b/api/oplet/src/main/java/edgent/oplet/Oplet.java
deleted file mode 100644
index 913a3f4..0000000
--- a/api/oplet/src/main/java/edgent/oplet/Oplet.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet;
-
-import java.util.List;
-
-import edgent.function.Consumer;
-
-/**
- * Generic API for an oplet that processes streaming data on 0-N input ports
- * and produces 0-M output streams on its output ports. An input port may be
- * connected with any number of streams from other oplets. An output port may
- * connected to any number of input ports on other oplets.
- *
- * @param <I>
- *            Data container type for input tuples.
- * @param <O>
- *            Data container type for output tuples.
- */
-public interface Oplet<I, O> extends AutoCloseable {
-
-    /**
-     * Initialize the oplet.
-     * 
-     * @param context the OpletContext
-     * @throws Exception on failure
-     */
-    void initialize(OpletContext<I, O> context) throws Exception;
-
-    /**
-     * Start the oplet. Oplets must not submit any tuples not derived from
-     * input tuples until this method is called.
-     */
-    void start();
-
-    /**
-     * Get the input stream data handlers for this oplet. The number of handlers
-     * must equal the number of configured input ports. Each tuple
-     * arriving on an input port will be sent to the stream handler for that
-     * input port.
-     * 
-     * @return list of consumers
-     */
-    List<? extends Consumer<I>> getInputs();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/OpletContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/OpletContext.java b/api/oplet/src/main/java/edgent/oplet/OpletContext.java
deleted file mode 100644
index ab841ea..0000000
--- a/api/oplet/src/main/java/edgent/oplet/OpletContext.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet;
-
-import java.util.List;
-
-import edgent.execution.services.RuntimeServices;
-import edgent.function.Consumer;
-
-/**
- * Context information for the {@code Oplet}'s invocation context.
- * <P>
- * At execution time an oplet uses its invocation context to retrieve 
- * provided {@link #getService(Class) services}, 
- * {@link #getOutputs() output ports} for tuple submission
- * and {@link #getJobContext() job} information. 
- *
- * @param <I> tuple type of input streams
- * @param <O> tuple type of output streams
- */
-public interface OpletContext<I, O> extends RuntimeServices {
-
-	/**
-	 * Get the unique identifier (within the running job)
-	 * for this oplet.
-	 * @return unique identifier for this oplet
-	 */
-	String getId();
-
-    /**
-     * {@inheritDoc}
-     * <P>
-     * Get a service for this oplet invocation.
-     * 
-     * An invocation of an oplet may get access to services,
-     * which provide specific functionality, such as metrics.
-     * </P>
-     * 
-     */
-	@Override
-    <T> T getService(Class<T> serviceClass);
-    
-    /**
-     * Get the number of connected inputs to this oplet.
-     * @return number of connected inputs to this oplet.
-     */
-    int getInputCount();
-    
-    /**
-     * Get the number of connected outputs to this oplet.
-     * @return number of connected outputs to this oplet.
-     */
-    int getOutputCount();
-
-    /**
-     * Get the mechanism to submit tuples on an output port.
-     * 
-     * @return list of consumers
-     */
-    List<? extends Consumer<O>> getOutputs();
-
-    /**
-     * Get the oplet's output port context information.
-     * @return list of {@link OutputPortContext}, one for each output port.
-     */
-    List<OutputPortContext> getOutputContext();
-
-    /**
-     * Get the job hosting this oplet. 
-     * @return {@link JobContext} hosting this oplet invocation.
-     */
-    JobContext getJobContext();
-    
-    /**
-     * Creates a unique name within the context of the current runtime.
-     * <p>
-     * The default implementation adds a suffix composed of the package 
-     * name of this interface, the current job and oplet identifiers, 
-     * all separated by periods ({@code '.'}).  Developers should use this 
-     * method to avoid name clashes when they store or register the name in 
-     * an external container or registry.
-     *
-     * @param name name (possibly non-unique)
-     * @return unique name within the context of the current runtime.
-     */
-    String uniquify(String name);
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/OutputPortContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/OutputPortContext.java b/api/oplet/src/main/java/edgent/oplet/OutputPortContext.java
deleted file mode 100644
index 957543a..0000000
--- a/api/oplet/src/main/java/edgent/oplet/OutputPortContext.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet;
-
-/**
- * Information about an oplet output port. 
- */
-public interface OutputPortContext {
-    /**
-     * Get the alias of the output port if any.
-     * @return the alias; null if none.
-     */
-    String getAlias();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/AbstractOplet.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/AbstractOplet.java b/api/oplet/src/main/java/edgent/oplet/core/AbstractOplet.java
deleted file mode 100644
index 83dd7ee..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/AbstractOplet.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import edgent.oplet.Oplet;
-import edgent.oplet.OpletContext;
-
-public abstract class AbstractOplet<I, O> implements Oplet<I, O> {
-
-    private OpletContext<I, O> context;
-
-    @Override
-    public void initialize(OpletContext<I, O> context) {
-        this.context = context;
-    }
-
-    public final OpletContext<I, O> getOpletContext() {
-        return context;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/FanIn.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/FanIn.java b/api/oplet/src/main/java/edgent/oplet/core/FanIn.java
deleted file mode 100644
index 80ac3d5..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/FanIn.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.oplet.OpletContext;
-
-/**
- * FanIn oplet, merges multiple input ports into a single output port.
- * <P>
- * For each tuple received, {@code receiver.apply(T tuple, Integer index)}
- * is called. {@code index} is the tuple's input stream's index, where
- * {@code this} is index 0 followed by {@code others} in their order.
- * {@code receiver} either returns a tuple to emit on the output
- * stream or null.
- * </P> 
- * 
- * @param <T> Tuple type of input streams
- * @param <U> Tuple type of output stream
- */
-public class FanIn<T,U> extends AbstractOplet<T, U> {
-    private BiFunction<T, Integer, U> receiver;
-    private List<Consumer<T>> iportConsumers;
-    private Consumer<U> destination;
-    
-    public FanIn() {
-    }
-    
-    public FanIn(BiFunction<T, Integer, U> receiver) {
-      this.receiver = receiver;
-    }
-
-    @Override
-    public void initialize(OpletContext<T, U> context) {
-        super.initialize(context);
-        destination = context.getOutputs().get(0);
-       
-        // Create a consumer for each input port.
-        int numIports = getOpletContext().getInputCount();
-        if (iportConsumers == null) {
-          // each iport invokes the receiver
-          iportConsumers = new ArrayList<>(numIports);
-          for (int i = 0; i < numIports; i++)
-            iportConsumers.add(consumer(i));
-          iportConsumers = Collections.unmodifiableList(iportConsumers);
-        }
-    }
-    
-    /**
-     * Set the receiver function.  Must be called no later than as part
-     * of {@link #initialize(OpletContext)}.
-     * @param receiver function to receive tuples
-     */
-    protected void setReceiver(BiFunction<T, Integer, U> receiver) {
-      this.receiver = receiver;
-    }
-
-    @Override
-    public void start() {
-    }
-
-    @Override
-    public List<? extends Consumer<T>> getInputs() {
-      return iportConsumers;
-    }
-
-    /**
-     * Create a Consumer for the input port that invokes the
-     * receiver and submits a generated tuple, if any, to the output.
-     * @param iportIndex index of the input port
-     * @return the Consumer
-     */
-    protected Consumer<T> consumer(int iportIndex) {
-      return tuple -> { 
-        U result = receiver.apply(tuple, iportIndex);
-        if (result != null)
-          submit(result);
-      };
-    }
-
-    protected Consumer<U> getDestination() {
-        return destination;
-    }
-    
-    /**
-     * Submit a tuple to single output.
-     * @param tuple Tuple to be submitted.
-     */
-    protected void submit(U tuple) {
-        getDestination().accept(tuple);
-    }
-
-    @Override
-    public void close() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/FanOut.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/FanOut.java b/api/oplet/src/main/java/edgent/oplet/core/FanOut.java
deleted file mode 100644
index 8c7dbcf..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/FanOut.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-
-public final class FanOut<T> extends AbstractOplet<T, T> implements Consumer<T> {
-    
-    /**
-     * 
-     */
-    private static final long serialVersionUID = 1L;
-    
-    private List<? extends Consumer<T>> targets;
-    private int n;
-
-    @Override
-    public void start() {
-    }
-
-    @Override
-    public List<? extends Consumer<T>> getInputs() {
-        targets = getOpletContext().getOutputs();
-        n = targets.size();
-        return Collections.singletonList(this);
-    }
-    
-    @Override
-    public void accept(T tuple) {
-        for (int i = 0; i < n; i++)
-            targets.get(i).accept(tuple);
-    }
-
-    @Override
-    public void close() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Peek.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Peek.java b/api/oplet/src/main/java/edgent/oplet/core/Peek.java
deleted file mode 100644
index fd744bb..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Peek.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-/**
- * Oplet that allows a peek at each tuple and always forwards a tuple onto
- * its single output port.
- * 
- * {@link #peek(Object)} is called before the tuple is forwarded
- * and it is intended that the peek be a low cost operation
- * such as increasing a metric.
- *
- * @param <T>
- *            Type of the tuple.
- */
-public abstract class Peek<T> extends Pipe<T, T> {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public final void accept(T tuple) {
-        peek(tuple);
-        submit(tuple);
-    }
-
-    protected abstract void peek(T tuple);
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/PeriodicSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/PeriodicSource.java b/api/oplet/src/main/java/edgent/oplet/core/PeriodicSource.java
deleted file mode 100644
index c3a4988..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/PeriodicSource.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import edgent.execution.mbeans.PeriodMXBean;
-import edgent.execution.services.ControlService;
-import edgent.oplet.OpletContext;
-import edgent.oplet.OutputPortContext;
-
-public abstract class PeriodicSource<T> extends Source<T> implements Runnable, PeriodMXBean {
-  
-    // see comment in TStream.TYPE
-    private static final String TSTREAM_TYPE = /*TStream.TYPE*/"stream";
-
-    private long period;
-    private TimeUnit unit;
-    private ScheduledFuture<?> future;
-
-    protected PeriodicSource(long period, TimeUnit unit) {
-        this.period = period;
-        this.unit = unit;
-    }
-
-    @Override
-    public void initialize(OpletContext<Void, T> context) {
-        super.initialize(context);
-    }
-
-    @Override
-    public synchronized void start() {
-        ControlService cs = getOpletContext().getService(ControlService.class);
-        // TODO BUG HERE: the control alias needs to be unique across the
-        // entire provider instance (multiple topologies) because the ControlService
-        // is provider-wide, not topology specific.
-        // Scope it with just the jobId.  What's going to unregister this control?
-        if (cs != null)
-            cs.registerControl(TSTREAM_TYPE, getOpletContext().uniquify(getClass().getSimpleName()), 
-                    getAlias(), PeriodMXBean.class, this);
-        schedule(false);
-    }
-    
-    private String getAlias() {
-        OutputPortContext oc = getOpletContext().getOutputContext().get(0);
-        return oc.getAlias();
-    }
-
-    private synchronized void schedule(boolean delay) {
-        future = getOpletContext().getService(ScheduledExecutorService.class).scheduleAtFixedRate(
-                getRunnable(), delay ? getPeriod() : 0, getPeriod(), getUnit());
-    }
-
-    protected Runnable getRunnable() {
-        return this;
-    }
-
-    protected abstract void fetchTuples() throws Exception;
-
-    @Override
-    public void run() {
-        try {
-            fetchTuples();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public synchronized long getPeriod() {
-        return period;
-    }
-
-    @Override
-    public synchronized TimeUnit getUnit() {
-        return unit;
-    }
-
-    @Override
-    public synchronized void setPeriod(long period) {
-        setPeriod(period, getUnit());
-    }
-
-    @Override
-    public synchronized void setPeriod(long period, TimeUnit unit) {
-        if (period <= 0)
-            throw new IllegalArgumentException();
-        if (this.period != period || this.unit != unit) {
-            future.cancel(false);
-            this.period = period;
-            this.unit = unit;
-            schedule(true);
-        }  
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Pipe.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Pipe.java b/api/oplet/src/main/java/edgent/oplet/core/Pipe.java
deleted file mode 100644
index 04df7a0..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Pipe.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-import edgent.oplet.OpletContext;
-
-/**
- * Pipe oplet with a single input and output. 
- *
- * @param <I>
- *            Data container type for input tuples.
- * @param <O>
- *            Data container type for output tuples.
- */
-public abstract class Pipe<I, O> extends AbstractOplet<I, O>implements Consumer<I> {
-    private static final long serialVersionUID = 1L;
-
-    private Consumer<O> destination;
-
-    @Override
-    public void initialize(OpletContext<I, O> context) {
-        super.initialize(context);
-
-        destination = context.getOutputs().get(0);
-    }
-
-    @Override
-    public void start() {
-    }
-
-    @Override
-    public List<Consumer<I>> getInputs() {
-        return Collections.singletonList(this);
-    }
-
-    protected Consumer<O> getDestination() {
-        return destination;
-    }
-    
-    /**
-     * Submit a tuple to single output.
-     * @param tuple Tuple to be submitted.
-     */
-    protected void submit(O tuple) {
-        getDestination().accept(tuple);
-    }
-}