You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:36 UTC

[35/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/test/java/edgent/test/window/StateTest.java
----------------------------------------------------------------------
diff --git a/api/window/src/test/java/edgent/test/window/StateTest.java b/api/window/src/test/java/edgent/test/window/StateTest.java
deleted file mode 100644
index 6c459f9..0000000
--- a/api/window/src/test/java/edgent/test/window/StateTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.window;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-import edgent.function.Supplier;
-import edgent.window.PartitionedState;
-
-
-public class StateTest {
-    
-    /**
-     * Test PartitionedState with immutable state.
-     */
-    @Test
-    public void partitionedImmutableStateTest() {
-        
-        TestState<Integer> state = new TestState<>(() -> 73);
- 
-        assertEquals(73, state.getState("A").intValue());
-        assertEquals(73, state.getState("B").intValue());
-        
-        assertEquals(73, state.removeState("A").intValue());
-        // and it reverts back to the initial value.
-        assertEquals(73, state.getState("A").intValue());
-        
-        assertEquals(73, state.setState("B", 102).intValue());
-        assertEquals(102, state.getState("B").intValue());
-        
-        assertEquals(73, state.getState("A").intValue());
-    }
-    
-    /**
-     * Test PartitionedState with mutable state, basically
-     * checking that the state does not get confused.
-     */
-    @Test
-    public void partitionedMutableStateTest() {
-        
-        TestState<int[]> state = new TestState<>(() -> new int[1]);
- 
-        assertEquals(0, state.getState("A")[0]);
-        assertEquals(0, state.getState("B")[0]);
-        
-        // change A, must not change B
-        state.getState("A")[0] = 73;
-        assertEquals(73, state.getState("A")[0]);
-        assertEquals(0, state.getState("B")[0]);
-        
-        // change B, must not change A
-        state.getState("B")[0] = 102;
-        assertEquals(73, state.getState("A")[0]);
-        assertEquals(102, state.getState("B")[0]);
-        
-        assertEquals(73, state.removeState("A")[0]);
-        assertEquals(0, state.getState("A")[0]);
-        
-        int[] newB = new int[1];
-        newB[0] = 9214;
-        assertEquals(102, state.setState("B", newB)[0]);
-        assertEquals(9214, state.getState("B")[0]);
-    }
-    
-    
-    private static class TestState<S> extends PartitionedState<String, S> {
-
-        protected TestState(Supplier<S> initialState) {
-            super(initialState);
-        }
-        @Override
-        public S getState(String key) {
-            return super.getState(key);
-        }
-        @Override
-        public S removeState(String key) {
-            return super.removeState(key);
-        }
-        @Override
-        public S setState(String key, S state) {
-            return super.setState(key, state);
-        } 
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/test/java/edgent/test/window/WindowTest.java
----------------------------------------------------------------------
diff --git a/api/window/src/test/java/edgent/test/window/WindowTest.java b/api/window/src/test/java/edgent/test/window/WindowTest.java
deleted file mode 100644
index 66cc1ed59..0000000
--- a/api/window/src/test/java/edgent/test/window/WindowTest.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.window;
-
-import static edgent.function.Functions.unpartitioned;
-import static edgent.window.Policies.alwaysInsert;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import edgent.function.BiConsumer;
-import edgent.window.InsertionTimeList;
-import edgent.window.Policies;
-import edgent.window.Window;
-import edgent.window.Windows;
-
-
-public class WindowTest {
-	
-    /**
-     * Verifies that the state of the window is correct after each tuple offer.
-     */
-    @Test
-    public void lastCountTest(){
-        final int COUNT = 100;
-        // The window implementation
-        Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, unpartitioned());
-        // The states of the window as it slides
-        LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
-        
-        // A processor that records the states of the window
-        BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
-            incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
-        };
-        window.registerPartitionProcessor(wp);
-        
-        // Generate sliding window correct incremental state to compare
-        // against the window's
-        LinkedList<LinkedList<Integer>> correctWindowStates = new LinkedList<>();
-        LinkedList<Integer> previous = null;
-        LinkedList<Integer> current = null;
-        for(int i = 0; i < COUNT; i++){
-            current = new LinkedList<>();         
-            if(previous != null)
-                current.addAll(previous);
-            
-            current.addLast(i);
-            if(current.size() > 10){
-                current.removeFirst();
-            }
-            previous = current;
-            correctWindowStates.addLast(current);              
-        }
-        
-        // Add tuples to window, populating the incrementalWindowStates list.
-        for(int i = 0; i < COUNT; i++){
-            window.insert(i);
-        }
-
-        // Compare correct window states to the window implementation's
-        assertTrue(correctWindowStates.size() == incrementalWindowStates.size());
-        for(int i = 0; i < correctWindowStates.size(); i++){
-            assertTrue(correctWindowStates.get(i).containsAll(incrementalWindowStates.get(i)));
-            assertTrue(incrementalWindowStates.get(i).containsAll(correctWindowStates.get(i)));
-        }
-    }
-    
-    @Test
-    public void keyedWindowTest(){
-        final int COUNT = 1000;
-        // The window implementation
-     // The window implementation
-        Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, tuple->tuple%10);
-        
-        
-        // The states of the window as it slides
-        LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
-        
-        // A processor that records the states of the window
-        BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
-            incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
-        };    
-        window.registerPartitionProcessor(wp); 
-        
-        Map<Integer, LinkedList<Integer>> correctPartitionedStates = new HashMap<>();
-        List<List<Integer> > correctWindowStates = new ArrayList<>();
-        for(int i = 0; i < 10; i++){
-            correctPartitionedStates.put(i, new LinkedList<>());
-        }
-        for(int i = 0; i < COUNT; i++){
-            correctPartitionedStates.get(i%10).add(i);
-            if(correctPartitionedStates.get(i%10).size() > 10){
-                correctPartitionedStates.get(i%10).removeFirst();
-            }
-            correctWindowStates.add(new ArrayList<>(correctPartitionedStates.get(i%10)));
-            window.insert(i);
-        }
-        
-        // Compare correct window states to the window implementation's
-        assertTrue(correctWindowStates.size() == incrementalWindowStates.size());
-        for(int i = 0; i < correctWindowStates.size(); i++){
-            assertTrue(correctWindowStates.get(i).containsAll(incrementalWindowStates.get(i)));
-            assertTrue(incrementalWindowStates.get(i).containsAll(correctWindowStates.get(i)));
-        }    
-    }
-    
-    @Test
-    public void accessPartitionKeyTest(){
-        LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
-        
-        Window<Integer, Integer, ? extends List<Integer>> window = Windows.<Integer, Integer, LinkedList<Integer>>window(
-        (partition, tuple) -> {
-            if (partition.getKey().equals(1) || partition.getKey().equals(3)) {
-                return false;
-            }
-            return true;
-        },
-        (partition, tuple) -> { // Contents policy
-
-                }, 
-        (partition) -> { // Evict determiner
-                    partition.getContents().clear();
-                }, 
-        Policies.processOnInsert(), 
-        tuple -> tuple, 
-        () -> new LinkedList<Integer>());
-
-        // A processor that records the states of the window
-        BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
-            incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
-        };    
-        window.registerPartitionProcessor(wp); 
-        
-        
-        for(Integer i = 0; i < 5; i++){
-            window.insert(i);
-        }
-        
-        assertTrue(incrementalWindowStates.size() == 3);
-        assertTrue(incrementalWindowStates.get(0).get(0)==0);
-        assertTrue(incrementalWindowStates.get(1).get(0)==2);
-        assertTrue(incrementalWindowStates.get(2).get(0)==4);
-        
-    }
-    
-    @Test
-    public void concurrentWindowAccessTest() throws InterruptedException {
-       
-        Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, tuple -> 0);
-        
-        window.registerPartitionProcessor((tuples, key) -> {
-            // ensure that the window state doesn't change after .05 seconds
-            // Copy window state
-            LinkedList<Integer> list_copy = new LinkedList<Integer>(tuples);
-            
-            // Wait .05 seconds
-            try {
-                Thread.sleep(50);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-            
-            // Verify that the partition is unchanged
-            assertTrue(list_copy.containsAll(tuples));
-            assertTrue(tuples.containsAll(list_copy));
-        });
-           
-        
-        // Run for five seconds.
-        long finishTime = System.currentTimeMillis() + 3000;
-        
-        List<Thread> threads = new ArrayList<Thread>();
-        
-        // Ten threads concurrently attempt to insert tuples into the window
-        for(int i = 0; i < 10; i++){
-            threads.add(new Thread(new Runnable(){
-                Random r = new Random();
-                @Override
-                public void run() {
-                    
-                    while(System.currentTimeMillis() < finishTime){                       
-                        try{
-                            window.insert(r.nextInt());
-                        }
-                        catch(ConcurrentModificationException cme){
-                            org.junit.Assert.fail("State of window changed while processing");
-                        }
-                    }  
-                }
-                
-            }));
-        }
-        for(Thread thread : threads){
-            thread.start();
-            Thread.sleep(10);
-        }
-        for(Thread thread : threads)
-            thread.join();
-    }
-    
-    
-    @Test
-    public void noWaitConcurrentWindowAccessTest() throws InterruptedException {
-        Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(100, tuple -> 0);
-        window.registerPartitionProcessor((tuples, key) -> {});
-        long finishTime = System.currentTimeMillis() + 3000;
-        
-        List<Thread> threads = new ArrayList<Thread>();
-        
-        // Ten threads concurrently attempt to insert tuples into the window
-        for(int i = 0; i < 10; i++){
-            threads.add(new Thread(new Runnable(){
-                Random r = new Random();
-                @Override
-                public void run() {
-                    while(System.currentTimeMillis() < finishTime){                       
-                        try{
-                            window.insert(r.nextInt());
-                        }
-                        catch(ConcurrentModificationException cme){
-                            org.junit.Assert.fail("State of window changed while processing");
-                        }
-                    }  
-                }
-                
-            }));
-        }
-        for(Thread thread : threads){
-            thread.start();
-            Thread.sleep(10);
-        }
-        for(Thread thread : threads)
-            thread.join();
-    }
-    
-    @Test
-    public void timeActionTest() throws InterruptedException {
-		// Timing variances on shared machines can cause this test to fail
-    	assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
-        List<Long> diffs = Collections.synchronizedList(new ArrayList<>());
-        List<Boolean> initialized = Collections.synchronizedList(new ArrayList<>());
-        initialized.add(false);
-        
-        Window<Long, Integer, InsertionTimeList<Long>> window = // new TimeWindowImpl<Long, Integer, LinkedList<Long>>(
-        Windows.window(
-                Policies.alwaysInsert(), // insertion policy
-                Policies.scheduleEvictIfEmpty(1000, TimeUnit.MILLISECONDS), 
-                // Policies.evictOlderThan(1000, TimeUnit.MILLISECONDS), 
-                Policies.evictOlderWithProcess(1000, TimeUnit.MILLISECONDS), 
-                (partition, tuple) -> {
-                    if(initialized.get(0).booleanValue() == false){
-                        initialized.set(0, true);
-                        ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
-                        ses.scheduleAtFixedRate(() -> {partition.process();}, 0, 1000, TimeUnit.MILLISECONDS);
-                    }}, 
-                unpartitioned(),
-                () -> new InsertionTimeList<Long>());
-        
-        window.registerPartitionProcessor((tuples, key) -> {
-            if(tuples.size() > 1)
-                diffs.add(tuples.get(tuples.size()-1) - tuples.get(0));
-        });
-        
-        window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
-        
-        long endTime = System.currentTimeMillis() + 8000;
-        List<Thread> threads = new ArrayList<>();
-        int NUM_THREADS = 10;
-        // Create 10 threads. Each inserts at 1,000 Hz
-        for(int i = 0; i < NUM_THREADS; i++){
-            threads.add(new Thread(new Runnable() {     
-                @Override
-                public void run() {
-                    while(System.currentTimeMillis() < endTime){
-                        window.insert(System.currentTimeMillis());
-                        try {
-                            Thread.sleep(1);
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            }));
-        }
-        for(int i = 0; i < NUM_THREADS; i++){
-            threads.get(i).start();
-        }
-        for(int i = 0; i < NUM_THREADS; i++){
-            threads.get(i).join();
-        }
-        assertOnTimeEvictions(diffs);
-        
-    }
-    
-    @SuppressWarnings("serial")
-    @Test
-    public void countBatchWindowTest(){
-        List<Integer> numBatches = new LinkedList<>();
-        Window<Integer, Integer, List<Integer>> window =
-                Windows.window(
-                        alwaysInsert(),
-                        Policies.doNothing(),
-                        Policies.evictAll(),
-                        Policies.processWhenFullAndEvict(113),
-                        tuple -> 0,
-                        () -> new ArrayList<Integer>());
-        window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
-            @Override
-            public void accept(List<Integer> t, Integer u) {
-                numBatches.add(1);
-            }
-        });
-        for(int i = 0; i < 1000; i++){
-            window.insert(i);
-        }
-        
-        assertTrue(numBatches.size() == 8);
-    }
-
-    @SuppressWarnings("serial")
-    @Test
-    public void timeBatchWindowTest() throws InterruptedException{
-        List<Long> numBatches = new LinkedList<>();
-        
-        ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
-        Window<Integer, Integer, List<Integer>> window =
-                Windows.window(
-                        alwaysInsert(),
-                        Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
-                        Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
-                        (partiton, tuple) -> {},
-                        tuple -> 0,
-                        () -> new ArrayList<Integer>());
-        
-        window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
-            @Override
-            public void accept(List<Integer> t, Integer u) {
-                numBatches.add((long)t.size());
-            }
-        });
-        
-        window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
-
-        ses.scheduleAtFixedRate(() -> {
-            window.insert(1);
-        }, 0, 10, TimeUnit.MILLISECONDS);
-
-        Thread.sleep(11000);
-        double tolerance = .08;
-        for(int i = 0; i < numBatches.size(); i++){
-            assertTrue("Values:" + numBatches.toString(), withinTolerance(100.0, numBatches.get(i).doubleValue(), tolerance));
-        }    
-    }
-    
-    @SuppressWarnings("serial")
-    @Test
-    public void keyedTimeBatchWindowTest() throws InterruptedException{
-        Map<Integer, List<Long> > numBatches = Collections.synchronizedMap(new HashMap<>());
-        for(int i = 0; i < 5; i++)
-        	numBatches.put(i, new LinkedList<Long>());
-        
-        ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
-        Window<Integer, Integer, List<Integer>> window =
-                Windows.window(
-                        alwaysInsert(),
-                        Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
-                        Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
-                        (partiton, tuple) -> {},
-                        tuple -> tuple,
-                        () -> new ArrayList<Integer>());
-        
-        window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
-            @Override
-            public void accept(List<Integer> t, Integer u) {
-            	List<Long> l = numBatches.get(u);
-                l.add((long)t.size());
-            }
-        });
-        
-        window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
-
-        int[] count = {1};
-        ses.scheduleAtFixedRate(() -> {
-            window.insert(count[0]++ % 5);
-        }, 0, 1, TimeUnit.MILLISECONDS);
-
-        Thread.sleep(11000);
-        double tolerance = .08;
-        
-        for(Integer key : numBatches.keySet()){
-        	List<Long> batch = numBatches.get(key);
-        	for(Long l : batch){
-        		assertTrue("Values:" + batch.toString(), withinTolerance(200.0, l.doubleValue(), tolerance));
-        	}
-        }
-    }
-   
-    
-    @SuppressWarnings("serial")
-    @Test
-    public void timeBatchEnsureUnique() throws InterruptedException{
-        List<List<Integer>> batches = Collections.synchronizedList(new LinkedList<>());
-
-        ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
-        Window<Integer, Integer, List<Integer>> window =
-                Windows.window(
-                        alwaysInsert(),
-                        Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
-                        Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
-                        (partiton, tuple) -> {},
-                        tuple -> 0,
-                        () -> new ArrayList<Integer>());
-        
-        window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
-            @Override
-            public void accept(List<Integer> t, Integer u) {
-                batches.add(new ArrayList<Integer>(t));
-            }
-        });
-        
-        window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
-
-        ses.scheduleAtFixedRate(new Runnable(){
-            private int count = 0;
-            @Override
-            public void run() {
-                if(count < 1000){
-                    window.insert(count++);
-                }
-            }
-            
-        }, 0, 10, TimeUnit.MILLISECONDS);
-
-        Thread.sleep(11000);
-        int numTuples = 0;
-        for(int i = 0; i < batches.size() - 1; i++){
-            List<Integer> batch = batches.get(i);
-            numTuples += batch.size();
-            for(int j = i + 1; j < batches.size(); j++){
-                assertTrue("Batches have overlapping tuples", Collections.disjoint(batches.get(i), batches.get(j)));
-            }
-        }
-        
-        numTuples += batches.get(batches.size() -1).size();
-        assertTrue("Number of tuples submitted (1000) != number of tuples processed in batch (" + numTuples + ")", numTuples == 1000);
-    }
-    
-    private void assertOnTimeEvictions(List<Long> diffs) {
-        double tolerance = .08;
-        for(int i = 1; i < diffs.size(); i++){
-            assertTrue(withinTolerance(1000.0, diffs.get(i).doubleValue(), tolerance));
-        }
-        
-    }
-
-    public static boolean withinTolerance(double expected, Double actual, double tolerance) {
-        double lowBound = (1.0 - tolerance) * expected;
-        double highBound = (1.0 + tolerance) * expected;
-        return (actual < highBound && actual > lowBound);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/test/java/org/apache/edgent/test/window/StateTest.java
----------------------------------------------------------------------
diff --git a/api/window/src/test/java/org/apache/edgent/test/window/StateTest.java b/api/window/src/test/java/org/apache/edgent/test/window/StateTest.java
new file mode 100644
index 0000000..f324152
--- /dev/null
+++ b/api/window/src/test/java/org/apache/edgent/test/window/StateTest.java
@@ -0,0 +1,101 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.window;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.window.PartitionedState;
+import org.junit.Test;
+
+
+public class StateTest {
+    
+    /**
+     * Test PartitionedState with immutable state.
+     */
+    @Test
+    public void partitionedImmutableStateTest() {
+        
+        TestState<Integer> state = new TestState<>(() -> 73);
+ 
+        assertEquals(73, state.getState("A").intValue());
+        assertEquals(73, state.getState("B").intValue());
+        
+        assertEquals(73, state.removeState("A").intValue());
+        // and it reverts back to the initial value.
+        assertEquals(73, state.getState("A").intValue());
+        
+        assertEquals(73, state.setState("B", 102).intValue());
+        assertEquals(102, state.getState("B").intValue());
+        
+        assertEquals(73, state.getState("A").intValue());
+    }
+    
+    /**
+     * Test PartitionedState with mutable state, basically
+     * checking that the state does not get confused.
+     */
+    @Test
+    public void partitionedMutableStateTest() {
+        
+        TestState<int[]> state = new TestState<>(() -> new int[1]);
+ 
+        assertEquals(0, state.getState("A")[0]);
+        assertEquals(0, state.getState("B")[0]);
+        
+        // change A, must not change B
+        state.getState("A")[0] = 73;
+        assertEquals(73, state.getState("A")[0]);
+        assertEquals(0, state.getState("B")[0]);
+        
+        // change B, must not change A
+        state.getState("B")[0] = 102;
+        assertEquals(73, state.getState("A")[0]);
+        assertEquals(102, state.getState("B")[0]);
+        
+        assertEquals(73, state.removeState("A")[0]);
+        assertEquals(0, state.getState("A")[0]);
+        
+        int[] newB = new int[1];
+        newB[0] = 9214;
+        assertEquals(102, state.setState("B", newB)[0]);
+        assertEquals(9214, state.getState("B")[0]);
+    }
+    
+    
+    private static class TestState<S> extends PartitionedState<String, S> {
+
+        protected TestState(Supplier<S> initialState) {
+            super(initialState);
+        }
+        @Override
+        public S getState(String key) {
+            return super.getState(key);
+        }
+        @Override
+        public S removeState(String key) {
+            return super.removeState(key);
+        }
+        @Override
+        public S setState(String key, S state) {
+            return super.setState(key, state);
+        } 
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java
----------------------------------------------------------------------
diff --git a/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java b/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java
new file mode 100644
index 0000000..f075b07
--- /dev/null
+++ b/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java
@@ -0,0 +1,491 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.window;
+
+import static org.apache.edgent.function.Functions.unpartitioned;
+import static org.apache.edgent.window.Policies.alwaysInsert;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.window.InsertionTimeList;
+import org.apache.edgent.window.Policies;
+import org.apache.edgent.window.Window;
+import org.apache.edgent.window.Windows;
+import org.junit.Test;
+
+
+public class WindowTest {
+	
+    /**
+     * Verifies that the state of the window is correct after each tuple offer.
+     */
+    @Test
+    public void lastCountTest(){
+        final int COUNT = 100;
+        // The window implementation
+        Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, unpartitioned());
+        // The states of the window as it slides
+        LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
+        
+        // A processor that records the states of the window
+        BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
+            incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
+        };
+        window.registerPartitionProcessor(wp);
+        
+        // Generate sliding window correct incremental state to compare
+        // against the window's
+        LinkedList<LinkedList<Integer>> correctWindowStates = new LinkedList<>();
+        LinkedList<Integer> previous = null;
+        LinkedList<Integer> current = null;
+        for(int i = 0; i < COUNT; i++){
+            current = new LinkedList<>();         
+            if(previous != null)
+                current.addAll(previous);
+            
+            current.addLast(i);
+            if(current.size() > 10){
+                current.removeFirst();
+            }
+            previous = current;
+            correctWindowStates.addLast(current);              
+        }
+        
+        // Add tuples to window, populating the incrementalWindowStates list.
+        for(int i = 0; i < COUNT; i++){
+            window.insert(i);
+        }
+
+        // Compare correct window states to the window implementation's
+        assertTrue(correctWindowStates.size() == incrementalWindowStates.size());
+        for(int i = 0; i < correctWindowStates.size(); i++){
+            assertTrue(correctWindowStates.get(i).containsAll(incrementalWindowStates.get(i)));
+            assertTrue(incrementalWindowStates.get(i).containsAll(correctWindowStates.get(i)));
+        }
+    }
+    
+    @Test
+    public void keyedWindowTest(){
+        final int COUNT = 1000;
+        // The window implementation
+     // The window implementation
+        Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, tuple->tuple%10);
+        
+        
+        // The states of the window as it slides
+        LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
+        
+        // A processor that records the states of the window
+        BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
+            incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
+        };    
+        window.registerPartitionProcessor(wp); 
+        
+        Map<Integer, LinkedList<Integer>> correctPartitionedStates = new HashMap<>();
+        List<List<Integer> > correctWindowStates = new ArrayList<>();
+        for(int i = 0; i < 10; i++){
+            correctPartitionedStates.put(i, new LinkedList<>());
+        }
+        for(int i = 0; i < COUNT; i++){
+            correctPartitionedStates.get(i%10).add(i);
+            if(correctPartitionedStates.get(i%10).size() > 10){
+                correctPartitionedStates.get(i%10).removeFirst();
+            }
+            correctWindowStates.add(new ArrayList<>(correctPartitionedStates.get(i%10)));
+            window.insert(i);
+        }
+        
+        // Compare correct window states to the window implementation's
+        assertTrue(correctWindowStates.size() == incrementalWindowStates.size());
+        for(int i = 0; i < correctWindowStates.size(); i++){
+            assertTrue(correctWindowStates.get(i).containsAll(incrementalWindowStates.get(i)));
+            assertTrue(incrementalWindowStates.get(i).containsAll(correctWindowStates.get(i)));
+        }    
+    }
+    
+    @Test
+    public void accessPartitionKeyTest(){
+        LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
+        
+        Window<Integer, Integer, ? extends List<Integer>> window = Windows.<Integer, Integer, LinkedList<Integer>>window(
+        (partition, tuple) -> {
+            if (partition.getKey().equals(1) || partition.getKey().equals(3)) {
+                return false;
+            }
+            return true;
+        },
+        (partition, tuple) -> { // Contents policy
+
+                }, 
+        (partition) -> { // Evict determiner
+                    partition.getContents().clear();
+                }, 
+        Policies.processOnInsert(), 
+        tuple -> tuple, 
+        () -> new LinkedList<Integer>());
+
+        // A processor that records the states of the window
+        BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
+            incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
+        };    
+        window.registerPartitionProcessor(wp); 
+        
+        
+        for(Integer i = 0; i < 5; i++){
+            window.insert(i);
+        }
+        
+        assertTrue(incrementalWindowStates.size() == 3);
+        assertTrue(incrementalWindowStates.get(0).get(0)==0);
+        assertTrue(incrementalWindowStates.get(1).get(0)==2);
+        assertTrue(incrementalWindowStates.get(2).get(0)==4);
+        
+    }
+    
+    @Test
+    public void concurrentWindowAccessTest() throws InterruptedException {
+       
+        Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, tuple -> 0);
+        
+        window.registerPartitionProcessor((tuples, key) -> {
+            // ensure that the window state doesn't change after .05 seconds
+            // Copy window state
+            LinkedList<Integer> list_copy = new LinkedList<Integer>(tuples);
+            
+            // Wait .05 seconds
+            try {
+                Thread.sleep(50);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            
+            // Verify that the partition is unchanged
+            assertTrue(list_copy.containsAll(tuples));
+            assertTrue(tuples.containsAll(list_copy));
+        });
+           
+        
+        // Run for five seconds.
+        long finishTime = System.currentTimeMillis() + 3000;
+        
+        List<Thread> threads = new ArrayList<Thread>();
+        
+        // Ten threads concurrently attempt to insert tuples into the window
+        for(int i = 0; i < 10; i++){
+            threads.add(new Thread(new Runnable(){
+                Random r = new Random();
+                @Override
+                public void run() {
+                    
+                    while(System.currentTimeMillis() < finishTime){                       
+                        try{
+                            window.insert(r.nextInt());
+                        }
+                        catch(ConcurrentModificationException cme){
+                            org.junit.Assert.fail("State of window changed while processing");
+                        }
+                    }  
+                }
+                
+            }));
+        }
+        for(Thread thread : threads){
+            thread.start();
+            Thread.sleep(10);
+        }
+        for(Thread thread : threads)
+            thread.join();
+    }
+    
+    
+    @Test
+    public void noWaitConcurrentWindowAccessTest() throws InterruptedException {
+        Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(100, tuple -> 0);
+        window.registerPartitionProcessor((tuples, key) -> {});
+        long finishTime = System.currentTimeMillis() + 3000;
+        
+        List<Thread> threads = new ArrayList<Thread>();
+        
+        // Ten threads concurrently attempt to insert tuples into the window
+        for(int i = 0; i < 10; i++){
+            threads.add(new Thread(new Runnable(){
+                Random r = new Random();
+                @Override
+                public void run() {
+                    while(System.currentTimeMillis() < finishTime){                       
+                        try{
+                            window.insert(r.nextInt());
+                        }
+                        catch(ConcurrentModificationException cme){
+                            org.junit.Assert.fail("State of window changed while processing");
+                        }
+                    }  
+                }
+                
+            }));
+        }
+        for(Thread thread : threads){
+            thread.start();
+            Thread.sleep(10);
+        }
+        for(Thread thread : threads)
+            thread.join();
+    }
+    
+    @Test
+    public void timeActionTest() throws InterruptedException {
+		// Timing variances on shared machines can cause this test to fail
+    	assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+        List<Long> diffs = Collections.synchronizedList(new ArrayList<>());
+        List<Boolean> initialized = Collections.synchronizedList(new ArrayList<>());
+        initialized.add(false);
+        
+        Window<Long, Integer, InsertionTimeList<Long>> window = // new TimeWindowImpl<Long, Integer, LinkedList<Long>>(
+        Windows.window(
+                Policies.alwaysInsert(), // insertion policy
+                Policies.scheduleEvictIfEmpty(1000, TimeUnit.MILLISECONDS), 
+                // Policies.evictOlderThan(1000, TimeUnit.MILLISECONDS), 
+                Policies.evictOlderWithProcess(1000, TimeUnit.MILLISECONDS), 
+                (partition, tuple) -> {
+                    if(initialized.get(0).booleanValue() == false){
+                        initialized.set(0, true);
+                        ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
+                        ses.scheduleAtFixedRate(() -> {partition.process();}, 0, 1000, TimeUnit.MILLISECONDS);
+                    }}, 
+                unpartitioned(),
+                () -> new InsertionTimeList<Long>());
+        
+        window.registerPartitionProcessor((tuples, key) -> {
+            if(tuples.size() > 1)
+                diffs.add(tuples.get(tuples.size()-1) - tuples.get(0));
+        });
+        
+        window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
+        
+        long endTime = System.currentTimeMillis() + 8000;
+        List<Thread> threads = new ArrayList<>();
+        int NUM_THREADS = 10;
+        // Create 10 threads. Each inserts at 1,000 Hz
+        for(int i = 0; i < NUM_THREADS; i++){
+            threads.add(new Thread(new Runnable() {     
+                @Override
+                public void run() {
+                    while(System.currentTimeMillis() < endTime){
+                        window.insert(System.currentTimeMillis());
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            }));
+        }
+        for(int i = 0; i < NUM_THREADS; i++){
+            threads.get(i).start();
+        }
+        for(int i = 0; i < NUM_THREADS; i++){
+            threads.get(i).join();
+        }
+        assertOnTimeEvictions(diffs);
+        
+    }
+    
+    @SuppressWarnings("serial")
+    @Test
+    public void countBatchWindowTest(){
+        List<Integer> numBatches = new LinkedList<>();
+        Window<Integer, Integer, List<Integer>> window =
+                Windows.window(
+                        alwaysInsert(),
+                        Policies.doNothing(),
+                        Policies.evictAll(),
+                        Policies.processWhenFullAndEvict(113),
+                        tuple -> 0,
+                        () -> new ArrayList<Integer>());
+        window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
+            @Override
+            public void accept(List<Integer> t, Integer u) {
+                numBatches.add(1);
+            }
+        });
+        for(int i = 0; i < 1000; i++){
+            window.insert(i);
+        }
+        
+        assertTrue(numBatches.size() == 8);
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void timeBatchWindowTest() throws InterruptedException{
+        List<Long> numBatches = new LinkedList<>();
+        
+        ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
+        Window<Integer, Integer, List<Integer>> window =
+                Windows.window(
+                        alwaysInsert(),
+                        Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
+                        Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
+                        (partiton, tuple) -> {},
+                        tuple -> 0,
+                        () -> new ArrayList<Integer>());
+        
+        window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
+            @Override
+            public void accept(List<Integer> t, Integer u) {
+                numBatches.add((long)t.size());
+            }
+        });
+        
+        window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
+
+        ses.scheduleAtFixedRate(() -> {
+            window.insert(1);
+        }, 0, 10, TimeUnit.MILLISECONDS);
+
+        Thread.sleep(11000);
+        double tolerance = .08;
+        for(int i = 0; i < numBatches.size(); i++){
+            assertTrue("Values:" + numBatches.toString(), withinTolerance(100.0, numBatches.get(i).doubleValue(), tolerance));
+        }    
+    }
+    
+    @SuppressWarnings("serial")
+    @Test
+    public void keyedTimeBatchWindowTest() throws InterruptedException{
+        Map<Integer, List<Long> > numBatches = Collections.synchronizedMap(new HashMap<>());
+        for(int i = 0; i < 5; i++)
+        	numBatches.put(i, new LinkedList<Long>());
+        
+        ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
+        Window<Integer, Integer, List<Integer>> window =
+                Windows.window(
+                        alwaysInsert(),
+                        Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
+                        Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
+                        (partiton, tuple) -> {},
+                        tuple -> tuple,
+                        () -> new ArrayList<Integer>());
+        
+        window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
+            @Override
+            public void accept(List<Integer> t, Integer u) {
+            	List<Long> l = numBatches.get(u);
+                l.add((long)t.size());
+            }
+        });
+        
+        window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
+
+        int[] count = {1};
+        ses.scheduleAtFixedRate(() -> {
+            window.insert(count[0]++ % 5);
+        }, 0, 1, TimeUnit.MILLISECONDS);
+
+        Thread.sleep(11000);
+        double tolerance = .08;
+        
+        for(Integer key : numBatches.keySet()){
+        	List<Long> batch = numBatches.get(key);
+        	for(Long l : batch){
+        		assertTrue("Values:" + batch.toString(), withinTolerance(200.0, l.doubleValue(), tolerance));
+        	}
+        }
+    }
+   
+    
+    @SuppressWarnings("serial")
+    @Test
+    public void timeBatchEnsureUnique() throws InterruptedException{
+        List<List<Integer>> batches = Collections.synchronizedList(new LinkedList<>());
+
+        ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
+        Window<Integer, Integer, List<Integer>> window =
+                Windows.window(
+                        alwaysInsert(),
+                        Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
+                        Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
+                        (partiton, tuple) -> {},
+                        tuple -> 0,
+                        () -> new ArrayList<Integer>());
+        
+        window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
+            @Override
+            public void accept(List<Integer> t, Integer u) {
+                batches.add(new ArrayList<Integer>(t));
+            }
+        });
+        
+        window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
+
+        ses.scheduleAtFixedRate(new Runnable(){
+            private int count = 0;
+            @Override
+            public void run() {
+                if(count < 1000){
+                    window.insert(count++);
+                }
+            }
+            
+        }, 0, 10, TimeUnit.MILLISECONDS);
+
+        Thread.sleep(11000);
+        int numTuples = 0;
+        for(int i = 0; i < batches.size() - 1; i++){
+            List<Integer> batch = batches.get(i);
+            numTuples += batch.size();
+            for(int j = i + 1; j < batches.size(); j++){
+                assertTrue("Batches have overlapping tuples", Collections.disjoint(batches.get(i), batches.get(j)));
+            }
+        }
+        
+        numTuples += batches.get(batches.size() -1).size();
+        assertTrue("Number of tuples submitted (1000) != number of tuples processed in batch (" + numTuples + ")", numTuples == 1000);
+    }
+    
+    private void assertOnTimeEvictions(List<Long> diffs) {
+        double tolerance = .08;
+        for(int i = 1; i < diffs.size(); i++){
+            assertTrue(withinTolerance(1000.0, diffs.get(i).doubleValue(), tolerance));
+        }
+        
+    }
+
+    public static boolean withinTolerance(double expected, Double actual, double tolerance) {
+        double lowBound = (1.0 - tolerance) * expected;
+        double highBound = (1.0 + tolerance) * expected;
+        return (actual < highBound && actual > lowBound);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/edgent/apps/iot/IotDevicePubSub.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/edgent/apps/iot/IotDevicePubSub.java b/apps/iot/src/main/java/edgent/apps/iot/IotDevicePubSub.java
deleted file mode 100644
index c7f8fea..0000000
--- a/apps/iot/src/main/java/edgent/apps/iot/IotDevicePubSub.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.apps.iot;
-
-import static edgent.topology.services.ApplicationService.SYSTEM_APP_PREFIX;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.iot.Events;
-import edgent.connectors.iot.IotDevice;
-import edgent.connectors.iot.QoS;
-import edgent.connectors.pubsub.PublishSubscribe;
-import edgent.topology.TStream;
-import edgent.topology.TopologyElement;
-
-/**
- * Application sharing an {@code IotDevice} through publish-subscribe. <BR>
- * This application shares an {@link IotDevice} across multiple running
- * jobs. This allows a single connection to a back-end message hub to be shared
- * across multiple independent applications, without having to build a single
- * topology.
- * <P>
- * Applications coded to {@link IotDevice} obtain a topology specific
- * {@code IotDevice} using {@link #addIotDevice(TopologyElement)}. This returned
- * device will route events and commands to/from the actual message hub
- * {@code IotDevice} through publish-subscribe.
- * <P>
- * An instance of this application is created by first creating a new topology and
- * then creating a {@link IotDevice} specific to the desired message hub. Then
- * the application is created by calling {@link #createApplication(IotDevice)}
- * passing the {@code IotDevice}. <BR>
- * Then additional independent applications (topologies) can be created and they
- * create a proxy {@code IotDevice} for their topology using
- * {@link #addIotDevice(TopologyElement)}. This proxy {@code IotDevice} is then
- * used to send device events and receive device commands in that topology. <BR>
- * Once all the topologies have been declared they can be submitted.
- * </P>
- * <P>
- * At startup this application sends a single device event with
- * identifier {@link Events#IOT_START}. This performs two functions:
- * </P>
- * <UL>
- * <LI>Initiates a connection to the message hub.</LI>
- * <LI>Allows external applications to be notified (by subscribing to device events)
- * when an Edgent provider starts.</LI>
- * </UL>
- * 
- * @see PublishSubscribe
- */
-public class IotDevicePubSub {
-    
-    /**
-     * IotDevicePubSub application name.
-     */
-    public static final String APP_NAME = SYSTEM_APP_PREFIX + "IotDevicePubSub";
-    
-    /**
-     * Events published to topic {@value} are sent as device events using the
-     * actual message hub {@link IotDevice}. <BR>
-     * it is recommended applications use the {@code IotDevice} returned by
-     * {@link #addIotDevice(TopologyElement)} to send events rather than
-     * publishing streams to this topic.
-     */
-    public static final String EVENTS_TOPIC = PublishSubscribe.RESERVED_TOPIC_PREFIX + "iot/events";
-    
-    /**
-     * Device commands are published to {@value} by
-     * this application. <BR>
-     * it is recommended applications use the {@code IotDevice} returned by
-     * {@link #addIotDevice(TopologyElement)} to receive commands rather than
-     * subscribing to streams with this topic prefix.
-     */
-    public static final String COMMANDS_TOPIC = PublishSubscribe.RESERVED_TOPIC_PREFIX + "iot/commands";
-
-    /**
-     * Create an instance of this application using {@code device} as the device
-     * connection to a message hub.
-     * 
-     * @param device the IotDevice
-     */
-    public static void createApplication(IotDevice device) {
-
-        TStream<JsonObject> events = PublishSubscribe.subscribe(device, EVENTS_TOPIC, JsonObject.class);
-
-        device.events(events, ew -> ew.getAsJsonPrimitive("eventId").getAsString(), ew -> ew.getAsJsonObject("event"),
-                ew -> ew.getAsJsonPrimitive("qos").getAsInt());
-        
-        PublishSubscribe.publish(device.commands(), COMMANDS_TOPIC, JsonObject.class);
-        
-        // Publish a single event at startup
-        TStream<JsonObject> start = device.topology().of(new JsonObject());
-        device.events(start, Events.IOT_START, QoS.AT_MOST_ONCE);
-    }
-
-    /**
-     * Add a proxy {@code IotDevice} for the topology containing {@code te}.
-     * <P>
-     * Any events sent through the returned device are sent onto the message hub
-     * device through publish-subscribe. <BR>
-     * Subscribing to commands using the returned device will subscribe to
-     * commands received by the message hub device.
-     * </P>
-     * 
-     * @param te
-     *            Topology the returned device is contained in.
-     * @return Proxy device.
-     */
-    public static IotDevice addIotDevice(TopologyElement te) {
-        return new PubSubIotDevice(te.topology());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/edgent/apps/iot/PubSubIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/edgent/apps/iot/PubSubIotDevice.java b/apps/iot/src/main/java/edgent/apps/iot/PubSubIotDevice.java
deleted file mode 100644
index 61af093..0000000
--- a/apps/iot/src/main/java/edgent/apps/iot/PubSubIotDevice.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.apps.iot;
-
-import static edgent.apps.iot.IotDevicePubSub.COMMANDS_TOPIC;
-import static edgent.apps.iot.IotDevicePubSub.EVENTS_TOPIC;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.iot.IotDevice;
-import edgent.connectors.pubsub.PublishSubscribe;
-import edgent.function.Function;
-import edgent.function.UnaryOperator;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-
-
-/**
- * Pub-sub IotDevice that uses publish-subscribe and IotDevicePubSub application
- * to communicate with a single IotDevice connected to a message hub.
- */
-class PubSubIotDevice implements IotDevice {
-
-    private final Topology topology;
-
-    /**
-     * Create a proxy IotDevice
-     * 
-     * @param app
-     *            IotDevicePubSub application hosting the actual IotDevice.
-     * @param topology
-     *            Topology of the subscribing application.
-     */
-    PubSubIotDevice(Topology topology) {
-        this.topology = topology;
-    }
-
-    @Override
-    public final Topology topology() {
-        return topology;
-    }
-
-    /**
-     * Publishes events derived from {@code stream} using the topic
-     * {@link EVENTS} as a JsonObject containing eventId, event,
-     * and qos keys.
-     */
-    @Override
-    public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
-            UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
-
-        stream = stream.map(event -> {
-            JsonObject publishedEvent = new JsonObject();
-
-            publishedEvent.addProperty("eventId", eventId.apply(event));
-            publishedEvent.add("event", payload.apply(event));
-            publishedEvent.addProperty("qos", qos.apply(event));
-
-            return publishedEvent;
-        });
-
-        return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS_TOPIC, JsonObject.class);
-    }
-
-    /**
-     * Publishes events derived from {@code stream} using the topic
-     * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
-     * and qos keys.
-     */
-    @Override
-    public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
-
-        stream = stream.map(event -> {
-            JsonObject publishedEvent = new JsonObject();
-
-            publishedEvent.addProperty("eventId", eventId);
-            publishedEvent.add("event", event);
-            publishedEvent.addProperty("qos", qos);
-
-            return publishedEvent;
-        });
-
-        return PublishSubscribe.publish(stream, EVENTS_TOPIC, JsonObject.class);
-    }
-
-    /**
-     * Subscribes to commands.
-     */
-    @Override
-    public TStream<JsonObject> commands(String... commandIdentifiers) {
-
-        TStream<JsonObject> commandsStream = PublishSubscribe.subscribe(this, COMMANDS_TOPIC, JsonObject.class);
-        
-        if (commandIdentifiers.length > 0) {
-            Set<String> cmdIds = new HashSet<>(Arrays.asList(commandIdentifiers));
-            commandsStream = commandsStream.filter(
-                    cmd -> cmdIds.contains(cmd.get(CMD_ID).getAsString()));
-        }
-
-        return commandsStream;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/edgent/apps/iot/package-info.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/edgent/apps/iot/package-info.java b/apps/iot/src/main/java/edgent/apps/iot/package-info.java
deleted file mode 100644
index cecb456..0000000
--- a/apps/iot/src/main/java/edgent/apps/iot/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Applications for use in an Internet of Things environment.
- */
-package edgent.apps.iot;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/org/apache/edgent/apps/iot/IotDevicePubSub.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/org/apache/edgent/apps/iot/IotDevicePubSub.java b/apps/iot/src/main/java/org/apache/edgent/apps/iot/IotDevicePubSub.java
new file mode 100644
index 0000000..075fd89
--- /dev/null
+++ b/apps/iot/src/main/java/org/apache/edgent/apps/iot/IotDevicePubSub.java
@@ -0,0 +1,128 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.apps.iot;
+
+import static org.apache.edgent.topology.services.ApplicationService.SYSTEM_APP_PREFIX;
+
+import org.apache.edgent.connectors.iot.Events;
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.connectors.pubsub.PublishSubscribe;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TopologyElement;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Application sharing an {@code IotDevice} through publish-subscribe. <BR>
+ * This application shares an {@link IotDevice} across multiple running
+ * jobs. This allows a single connection to a back-end message hub to be shared
+ * across multiple independent applications, without having to build a single
+ * topology.
+ * <P>
+ * Applications coded to {@link IotDevice} obtain a topology specific
+ * {@code IotDevice} using {@link #addIotDevice(TopologyElement)}. This returned
+ * device will route events and commands to/from the actual message hub
+ * {@code IotDevice} through publish-subscribe.
+ * <P>
+ * An instance of this application is created by first creating a new topology and
+ * then creating a {@link IotDevice} specific to the desired message hub. Then
+ * the application is created by calling {@link #createApplication(IotDevice)}
+ * passing the {@code IotDevice}. <BR>
+ * Then additional independent applications (topologies) can be created and they
+ * create a proxy {@code IotDevice} for their topology using
+ * {@link #addIotDevice(TopologyElement)}. This proxy {@code IotDevice} is then
+ * used to send device events and receive device commands in that topology. <BR>
+ * Once all the topologies have been declared they can be submitted.
+ * </P>
+ * <P>
+ * At startup this application sends a single device event with
+ * identifier {@link Events#IOT_START}. This performs two functions:
+ * </P>
+ * <UL>
+ * <LI>Initiates a connection to the message hub.</LI>
+ * <LI>Allows external applications to be notified (by subscribing to device events)
+ * when an Edgent provider starts.</LI>
+ * </UL>
+ * 
+ * @see PublishSubscribe
+ */
+public class IotDevicePubSub {
+    
+    /**
+     * IotDevicePubSub application name.
+     */
+    public static final String APP_NAME = SYSTEM_APP_PREFIX + "IotDevicePubSub";
+    
+    /**
+     * Events published to topic {@value} are sent as device events using the
+     * actual message hub {@link IotDevice}. <BR>
+     * it is recommended applications use the {@code IotDevice} returned by
+     * {@link #addIotDevice(TopologyElement)} to send events rather than
+     * publishing streams to this topic.
+     */
+    public static final String EVENTS_TOPIC = PublishSubscribe.RESERVED_TOPIC_PREFIX + "iot/events";
+    
+    /**
+     * Device commands are published to {@value} by
+     * this application. <BR>
+     * it is recommended applications use the {@code IotDevice} returned by
+     * {@link #addIotDevice(TopologyElement)} to receive commands rather than
+     * subscribing to streams with this topic prefix.
+     */
+    public static final String COMMANDS_TOPIC = PublishSubscribe.RESERVED_TOPIC_PREFIX + "iot/commands";
+
+    /**
+     * Create an instance of this application using {@code device} as the device
+     * connection to a message hub.
+     * 
+     * @param device the IotDevice
+     */
+    public static void createApplication(IotDevice device) {
+
+        TStream<JsonObject> events = PublishSubscribe.subscribe(device, EVENTS_TOPIC, JsonObject.class);
+
+        device.events(events, ew -> ew.getAsJsonPrimitive("eventId").getAsString(), ew -> ew.getAsJsonObject("event"),
+                ew -> ew.getAsJsonPrimitive("qos").getAsInt());
+        
+        PublishSubscribe.publish(device.commands(), COMMANDS_TOPIC, JsonObject.class);
+        
+        // Publish a single event at startup
+        TStream<JsonObject> start = device.topology().of(new JsonObject());
+        device.events(start, Events.IOT_START, QoS.AT_MOST_ONCE);
+    }
+
+    /**
+     * Add a proxy {@code IotDevice} for the topology containing {@code te}.
+     * <P>
+     * Any events sent through the returned device are sent onto the message hub
+     * device through publish-subscribe. <BR>
+     * Subscribing to commands using the returned device will subscribe to
+     * commands received by the message hub device.
+     * </P>
+     * 
+     * @param te
+     *            Topology the returned device is contained in.
+     * @return Proxy device.
+     */
+    public static IotDevice addIotDevice(TopologyElement te) {
+        return new PubSubIotDevice(te.topology());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java b/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java
new file mode 100644
index 0000000..0110a01
--- /dev/null
+++ b/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java
@@ -0,0 +1,124 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.apps.iot;
+
+import static org.apache.edgent.apps.iot.IotDevicePubSub.COMMANDS_TOPIC;
+import static org.apache.edgent.apps.iot.IotDevicePubSub.EVENTS_TOPIC;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.connectors.pubsub.PublishSubscribe;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.UnaryOperator;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import com.google.gson.JsonObject;
+
+
+/**
+ * Pub-sub IotDevice that uses publish-subscribe and IotDevicePubSub application
+ * to communicate with a single IotDevice connected to a message hub.
+ */
+class PubSubIotDevice implements IotDevice {
+
+    private final Topology topology;
+
+    /**
+     * Create a proxy IotDevice
+     * 
+     * @param app
+     *            IotDevicePubSub application hosting the actual IotDevice.
+     * @param topology
+     *            Topology of the subscribing application.
+     */
+    PubSubIotDevice(Topology topology) {
+        this.topology = topology;
+    }
+
+    @Override
+    public final Topology topology() {
+        return topology;
+    }
+
+    /**
+     * Publishes events derived from {@code stream} using the topic
+     * {@link EVENTS} as a JsonObject containing eventId, event,
+     * and qos keys.
+     */
+    @Override
+    public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+            UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
+
+        stream = stream.map(event -> {
+            JsonObject publishedEvent = new JsonObject();
+
+            publishedEvent.addProperty("eventId", eventId.apply(event));
+            publishedEvent.add("event", payload.apply(event));
+            publishedEvent.addProperty("qos", qos.apply(event));
+
+            return publishedEvent;
+        });
+
+        return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS_TOPIC, JsonObject.class);
+    }
+
+    /**
+     * Publishes events derived from {@code stream} using the topic
+     * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
+     * and qos keys.
+     */
+    @Override
+    public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
+
+        stream = stream.map(event -> {
+            JsonObject publishedEvent = new JsonObject();
+
+            publishedEvent.addProperty("eventId", eventId);
+            publishedEvent.add("event", event);
+            publishedEvent.addProperty("qos", qos);
+
+            return publishedEvent;
+        });
+
+        return PublishSubscribe.publish(stream, EVENTS_TOPIC, JsonObject.class);
+    }
+
+    /**
+     * Subscribes to commands.
+     */
+    @Override
+    public TStream<JsonObject> commands(String... commandIdentifiers) {
+
+        TStream<JsonObject> commandsStream = PublishSubscribe.subscribe(this, COMMANDS_TOPIC, JsonObject.class);
+        
+        if (commandIdentifiers.length > 0) {
+            Set<String> cmdIds = new HashSet<>(Arrays.asList(commandIdentifiers));
+            commandsStream = commandsStream.filter(
+                    cmd -> cmdIds.contains(cmd.get(CMD_ID).getAsString()));
+        }
+
+        return commandsStream;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/org/apache/edgent/apps/iot/package-info.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/org/apache/edgent/apps/iot/package-info.java b/apps/iot/src/main/java/org/apache/edgent/apps/iot/package-info.java
new file mode 100644
index 0000000..c6aa983
--- /dev/null
+++ b/apps/iot/src/main/java/org/apache/edgent/apps/iot/package-info.java
@@ -0,0 +1,23 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Applications for use in an Internet of Things environment.
+ */
+package org.apache.edgent.apps.iot;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/test/java/edgent/test/apps/iot/EchoIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/edgent/test/apps/iot/EchoIotDevice.java b/apps/iot/src/test/java/edgent/test/apps/iot/EchoIotDevice.java
deleted file mode 100644
index c81edcc..0000000
--- a/apps/iot/src/test/java/edgent/test/apps/iot/EchoIotDevice.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.test.apps.iot;
-
-import static edgent.function.Functions.discard;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.iot.IotDevice;
-import edgent.function.Function;
-import edgent.function.UnaryOperator;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-
-/**
- * A test IotDevice that echos back every event as a command with command
- * identifier equal to the {@code cmdId} value in the event payload. If {@code cmdId}
- * is not set then the event identifier is used.
- *
- */
-public class EchoIotDevice implements IotDevice {
-    
-    public static final String EVENT_CMD_ID = "cmdId";
-
-    private final Topology topology;
-    private TStream<JsonObject> echoCmds;
-
-    public EchoIotDevice(Topology topology) {
-        this.topology = topology;
-    }
-
-    @Override
-    public Topology topology() {
-        return topology;
-    }
-
-    @Override
-    public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
-            UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
-        
-        stream = stream.map(e -> {
-            JsonObject c = new JsonObject();
-            JsonObject evPayload = payload.apply(e);
-            c.addProperty(CMD_ID, getCommandIdFromEvent(eventId.apply(e), evPayload));
-            c.add(CMD_PAYLOAD, evPayload);
-            c.addProperty(CMD_FORMAT, "json");
-            c.addProperty(CMD_TS, System.currentTimeMillis());
-            return c;
-        });
-        
-        return handleEvents(stream);
-    }
-    
-    private static String getCommandIdFromEvent(String eventId, JsonObject evPayload) {
-        if (evPayload.has(EVENT_CMD_ID))
-            return evPayload.getAsJsonPrimitive(EVENT_CMD_ID).getAsString();
-        else
-            return eventId;
-    }
-
-    @Override
-    public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
-        
-        stream = stream.map(e -> {
-            JsonObject c = new JsonObject();
-            c.addProperty(CMD_ID, getCommandIdFromEvent(eventId, e));
-            c.add(CMD_PAYLOAD, e);
-            c.addProperty(CMD_FORMAT, "json");
-            c.addProperty(CMD_TS, System.currentTimeMillis());
-            return c;
-        });
-        
-        return handleEvents(stream);
-    }
-    
-    private TSink<JsonObject> handleEvents(TStream<JsonObject> stream) {
-        
-        if (echoCmds == null)
-            echoCmds = PlumbingStreams.isolate(stream, true);
-        else
-            echoCmds = PlumbingStreams.isolate(stream.union(echoCmds), true);
-        
-        return stream.sink(discard());
-    }
-
-    @Override
-    public TStream<JsonObject> commands(String... commands) {
-        if (commands.length == 0)
-            return echoCmds;
-
-        Set<String> cmds = new HashSet<>(Arrays.asList(commands));
-        return echoCmds.filter(cmd -> cmds.contains(cmd.getAsJsonPrimitive(CMD_ID).getAsString()));
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/test/java/edgent/test/apps/iot/IotDevicePubSubTest.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/edgent/test/apps/iot/IotDevicePubSubTest.java b/apps/iot/src/test/java/edgent/test/apps/iot/IotDevicePubSubTest.java
deleted file mode 100644
index f92b425..0000000
--- a/apps/iot/src/test/java/edgent/test/apps/iot/IotDevicePubSubTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.test.apps.iot;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
-import edgent.apps.iot.IotDevicePubSub;
-import edgent.connectors.iot.IotDevice;
-import edgent.connectors.iot.QoS;
-import edgent.connectors.pubsub.service.ProviderPubSub;
-import edgent.connectors.pubsub.service.PublishSubscribeService;
-import edgent.execution.Job;
-import edgent.execution.Job.Action;
-import edgent.providers.direct.DirectProvider;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-import edgent.topology.tester.Condition;
-
-public class IotDevicePubSubTest {
-
-    
-
-    @Test
-    public void testIotDevicePubSubApp() throws Exception {
-        DirectProvider dp = new DirectProvider();
-
-        dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
-        
-        Topology iot = dp.newTopology("IotPubSub");
-        IotDevicePubSub.createApplication(new EchoIotDevice(iot));
-        
-        Topology app1 = dp.newTopology("App1");
-        
-        IotDevice app1Iot = IotDevicePubSub.addIotDevice(app1);
-        
-        TStream<String> data = app1.strings("A", "B", "C");
-        
-        // Without this the tuple can be published and discarded before the
-        // subscriber is hooked up.
-        data = PlumbingStreams.blockingOneShotDelay(data, 500, TimeUnit.MILLISECONDS);
-        
-        TStream<JsonObject> events = data.map(
-                s -> {JsonObject j = new JsonObject(); j.addProperty("v", s); return j;});       
-        app1Iot.events(events, "ps1", QoS.FIRE_AND_FORGET);
-        
-        TStream<JsonObject> echoedCmds = app1Iot.commands("ps1");
-        
-        TStream<String> ecs = echoedCmds.map(j -> j.getAsJsonObject(IotDevice.CMD_PAYLOAD).getAsJsonPrimitive("v").getAsString());
-        Condition<List<String>> tcEcho = app1.getTester().streamContents(ecs, "A", "B", "C"); // Expect all tuples
-        
-        Job jIot = dp.submit(iot.topology()).get();
-        Job jApp = dp.submit(app1.topology()).get();
-
-        for (int i = 0; i < 50 && !tcEcho.valid(); i++) {
-            Thread.sleep(50);
-        }
-
-        assertTrue(tcEcho.getResult().toString(), tcEcho.valid());        
-
-        jIot.stateChange(Action.CLOSE);
-        jApp.stateChange(Action.CLOSE);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java b/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java
new file mode 100644
index 0000000..65b217f
--- /dev/null
+++ b/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.apps.iot;
+
+import static org.apache.edgent.function.Functions.discard;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.UnaryOperator;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+
+import com.google.gson.JsonObject;
+
+/**
+ * A test IotDevice that echos back every event as a command with command
+ * identifier equal to the {@code cmdId} value in the event payload. If {@code cmdId}
+ * is not set then the event identifier is used.
+ *
+ */
+public class EchoIotDevice implements IotDevice {
+    
+    public static final String EVENT_CMD_ID = "cmdId";
+
+    private final Topology topology;
+    private TStream<JsonObject> echoCmds;
+
+    public EchoIotDevice(Topology topology) {
+        this.topology = topology;
+    }
+
+    @Override
+    public Topology topology() {
+        return topology;
+    }
+
+    @Override
+    public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+            UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
+        
+        stream = stream.map(e -> {
+            JsonObject c = new JsonObject();
+            JsonObject evPayload = payload.apply(e);
+            c.addProperty(CMD_ID, getCommandIdFromEvent(eventId.apply(e), evPayload));
+            c.add(CMD_PAYLOAD, evPayload);
+            c.addProperty(CMD_FORMAT, "json");
+            c.addProperty(CMD_TS, System.currentTimeMillis());
+            return c;
+        });
+        
+        return handleEvents(stream);
+    }
+    
+    private static String getCommandIdFromEvent(String eventId, JsonObject evPayload) {
+        if (evPayload.has(EVENT_CMD_ID))
+            return evPayload.getAsJsonPrimitive(EVENT_CMD_ID).getAsString();
+        else
+            return eventId;
+    }
+
+    @Override
+    public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
+        
+        stream = stream.map(e -> {
+            JsonObject c = new JsonObject();
+            c.addProperty(CMD_ID, getCommandIdFromEvent(eventId, e));
+            c.add(CMD_PAYLOAD, e);
+            c.addProperty(CMD_FORMAT, "json");
+            c.addProperty(CMD_TS, System.currentTimeMillis());
+            return c;
+        });
+        
+        return handleEvents(stream);
+    }
+    
+    private TSink<JsonObject> handleEvents(TStream<JsonObject> stream) {
+        
+        if (echoCmds == null)
+            echoCmds = PlumbingStreams.isolate(stream, true);
+        else
+            echoCmds = PlumbingStreams.isolate(stream.union(echoCmds), true);
+        
+        return stream.sink(discard());
+    }
+
+    @Override
+    public TStream<JsonObject> commands(String... commands) {
+        if (commands.length == 0)
+            return echoCmds;
+
+        Set<String> cmds = new HashSet<>(Arrays.asList(commands));
+        return echoCmds.filter(cmd -> cmds.contains(cmd.getAsJsonPrimitive(CMD_ID).getAsString()));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/IotDevicePubSubTest.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/IotDevicePubSubTest.java b/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/IotDevicePubSubTest.java
new file mode 100644
index 0000000..f34de4f
--- /dev/null
+++ b/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/IotDevicePubSubTest.java
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.apps.iot;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.apps.iot.IotDevicePubSub;
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.connectors.pubsub.service.ProviderPubSub;
+import org.apache.edgent.connectors.pubsub.service.PublishSubscribeService;
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Job.Action;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+public class IotDevicePubSubTest {
+
+    
+
+    @Test
+    public void testIotDevicePubSubApp() throws Exception {
+        DirectProvider dp = new DirectProvider();
+
+        dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
+        
+        Topology iot = dp.newTopology("IotPubSub");
+        IotDevicePubSub.createApplication(new EchoIotDevice(iot));
+        
+        Topology app1 = dp.newTopology("App1");
+        
+        IotDevice app1Iot = IotDevicePubSub.addIotDevice(app1);
+        
+        TStream<String> data = app1.strings("A", "B", "C");
+        
+        // Without this the tuple can be published and discarded before the
+        // subscriber is hooked up.
+        data = PlumbingStreams.blockingOneShotDelay(data, 500, TimeUnit.MILLISECONDS);
+        
+        TStream<JsonObject> events = data.map(
+                s -> {JsonObject j = new JsonObject(); j.addProperty("v", s); return j;});       
+        app1Iot.events(events, "ps1", QoS.FIRE_AND_FORGET);
+        
+        TStream<JsonObject> echoedCmds = app1Iot.commands("ps1");
+        
+        TStream<String> ecs = echoedCmds.map(j -> j.getAsJsonObject(IotDevice.CMD_PAYLOAD).getAsJsonPrimitive("v").getAsString());
+        Condition<List<String>> tcEcho = app1.getTester().streamContents(ecs, "A", "B", "C"); // Expect all tuples
+        
+        Job jIot = dp.submit(iot.topology()).get();
+        Job jApp = dp.submit(app1.topology()).get();
+
+        for (int i = 0; i < 50 && !tcEcho.valid(); i++) {
+            Thread.sleep(50);
+        }
+
+        assertTrue(tcEcho.getResult().toString(), tcEcho.valid());        
+
+        jIot.stateChange(Action.CLOSE);
+        jApp.stateChange(Action.CLOSE);
+    }
+}