You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:36 UTC
[35/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/test/java/edgent/test/window/StateTest.java
----------------------------------------------------------------------
diff --git a/api/window/src/test/java/edgent/test/window/StateTest.java b/api/window/src/test/java/edgent/test/window/StateTest.java
deleted file mode 100644
index 6c459f9..0000000
--- a/api/window/src/test/java/edgent/test/window/StateTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.window;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-import edgent.function.Supplier;
-import edgent.window.PartitionedState;
-
-
-public class StateTest {
-
- /**
- * Test PartitionedState with immutable state.
- */
- @Test
- public void partitionedImmutableStateTest() {
-
- TestState<Integer> state = new TestState<>(() -> 73);
-
- assertEquals(73, state.getState("A").intValue());
- assertEquals(73, state.getState("B").intValue());
-
- assertEquals(73, state.removeState("A").intValue());
- // and it reverts back to the initial value.
- assertEquals(73, state.getState("A").intValue());
-
- assertEquals(73, state.setState("B", 102).intValue());
- assertEquals(102, state.getState("B").intValue());
-
- assertEquals(73, state.getState("A").intValue());
- }
-
- /**
- * Test PartitionedState with mutable state, basically
- * checking that the state does not get confused.
- */
- @Test
- public void partitionedMutableStateTest() {
-
- TestState<int[]> state = new TestState<>(() -> new int[1]);
-
- assertEquals(0, state.getState("A")[0]);
- assertEquals(0, state.getState("B")[0]);
-
- // change A, must not change B
- state.getState("A")[0] = 73;
- assertEquals(73, state.getState("A")[0]);
- assertEquals(0, state.getState("B")[0]);
-
- // change B, must not change A
- state.getState("B")[0] = 102;
- assertEquals(73, state.getState("A")[0]);
- assertEquals(102, state.getState("B")[0]);
-
- assertEquals(73, state.removeState("A")[0]);
- assertEquals(0, state.getState("A")[0]);
-
- int[] newB = new int[1];
- newB[0] = 9214;
- assertEquals(102, state.setState("B", newB)[0]);
- assertEquals(9214, state.getState("B")[0]);
- }
-
-
- private static class TestState<S> extends PartitionedState<String, S> {
-
- protected TestState(Supplier<S> initialState) {
- super(initialState);
- }
- @Override
- public S getState(String key) {
- return super.getState(key);
- }
- @Override
- public S removeState(String key) {
- return super.removeState(key);
- }
- @Override
- public S setState(String key, S state) {
- return super.setState(key, state);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/test/java/edgent/test/window/WindowTest.java
----------------------------------------------------------------------
diff --git a/api/window/src/test/java/edgent/test/window/WindowTest.java b/api/window/src/test/java/edgent/test/window/WindowTest.java
deleted file mode 100644
index 66cc1ed59..0000000
--- a/api/window/src/test/java/edgent/test/window/WindowTest.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.window;
-
-import static edgent.function.Functions.unpartitioned;
-import static edgent.window.Policies.alwaysInsert;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import edgent.function.BiConsumer;
-import edgent.window.InsertionTimeList;
-import edgent.window.Policies;
-import edgent.window.Window;
-import edgent.window.Windows;
-
-
-public class WindowTest {
-
- /**
- * Verifies that the state of the window is correct after each tuple offer.
- */
- @Test
- public void lastCountTest(){
- final int COUNT = 100;
- // The window implementation
- Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, unpartitioned());
- // The states of the window as it slides
- LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
-
- // A processor that records the states of the window
- BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
- incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
- };
- window.registerPartitionProcessor(wp);
-
- // Generate sliding window correct incremental state to compare
- // against the window's
- LinkedList<LinkedList<Integer>> correctWindowStates = new LinkedList<>();
- LinkedList<Integer> previous = null;
- LinkedList<Integer> current = null;
- for(int i = 0; i < COUNT; i++){
- current = new LinkedList<>();
- if(previous != null)
- current.addAll(previous);
-
- current.addLast(i);
- if(current.size() > 10){
- current.removeFirst();
- }
- previous = current;
- correctWindowStates.addLast(current);
- }
-
- // Add tuples to window, populating the incrementalWindowStates list.
- for(int i = 0; i < COUNT; i++){
- window.insert(i);
- }
-
- // Compare correct window states to the window implementation's
- assertTrue(correctWindowStates.size() == incrementalWindowStates.size());
- for(int i = 0; i < correctWindowStates.size(); i++){
- assertTrue(correctWindowStates.get(i).containsAll(incrementalWindowStates.get(i)));
- assertTrue(incrementalWindowStates.get(i).containsAll(correctWindowStates.get(i)));
- }
- }
-
- @Test
- public void keyedWindowTest(){
- final int COUNT = 1000;
- // The window implementation
- // The window implementation
- Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, tuple->tuple%10);
-
-
- // The states of the window as it slides
- LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
-
- // A processor that records the states of the window
- BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
- incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
- };
- window.registerPartitionProcessor(wp);
-
- Map<Integer, LinkedList<Integer>> correctPartitionedStates = new HashMap<>();
- List<List<Integer> > correctWindowStates = new ArrayList<>();
- for(int i = 0; i < 10; i++){
- correctPartitionedStates.put(i, new LinkedList<>());
- }
- for(int i = 0; i < COUNT; i++){
- correctPartitionedStates.get(i%10).add(i);
- if(correctPartitionedStates.get(i%10).size() > 10){
- correctPartitionedStates.get(i%10).removeFirst();
- }
- correctWindowStates.add(new ArrayList<>(correctPartitionedStates.get(i%10)));
- window.insert(i);
- }
-
- // Compare correct window states to the window implementation's
- assertTrue(correctWindowStates.size() == incrementalWindowStates.size());
- for(int i = 0; i < correctWindowStates.size(); i++){
- assertTrue(correctWindowStates.get(i).containsAll(incrementalWindowStates.get(i)));
- assertTrue(incrementalWindowStates.get(i).containsAll(correctWindowStates.get(i)));
- }
- }
-
- @Test
- public void accessPartitionKeyTest(){
- LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
-
- Window<Integer, Integer, ? extends List<Integer>> window = Windows.<Integer, Integer, LinkedList<Integer>>window(
- (partition, tuple) -> {
- if (partition.getKey().equals(1) || partition.getKey().equals(3)) {
- return false;
- }
- return true;
- },
- (partition, tuple) -> { // Contents policy
-
- },
- (partition) -> { // Evict determiner
- partition.getContents().clear();
- },
- Policies.processOnInsert(),
- tuple -> tuple,
- () -> new LinkedList<Integer>());
-
- // A processor that records the states of the window
- BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
- incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
- };
- window.registerPartitionProcessor(wp);
-
-
- for(Integer i = 0; i < 5; i++){
- window.insert(i);
- }
-
- assertTrue(incrementalWindowStates.size() == 3);
- assertTrue(incrementalWindowStates.get(0).get(0)==0);
- assertTrue(incrementalWindowStates.get(1).get(0)==2);
- assertTrue(incrementalWindowStates.get(2).get(0)==4);
-
- }
-
- @Test
- public void concurrentWindowAccessTest() throws InterruptedException {
-
- Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, tuple -> 0);
-
- window.registerPartitionProcessor((tuples, key) -> {
- // ensure that the window state doesn't change after .05 seconds
- // Copy window state
- LinkedList<Integer> list_copy = new LinkedList<Integer>(tuples);
-
- // Wait .05 seconds
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- // Verify that the partition is unchanged
- assertTrue(list_copy.containsAll(tuples));
- assertTrue(tuples.containsAll(list_copy));
- });
-
-
- // Run for five seconds.
- long finishTime = System.currentTimeMillis() + 3000;
-
- List<Thread> threads = new ArrayList<Thread>();
-
- // Ten threads concurrently attempt to insert tuples into the window
- for(int i = 0; i < 10; i++){
- threads.add(new Thread(new Runnable(){
- Random r = new Random();
- @Override
- public void run() {
-
- while(System.currentTimeMillis() < finishTime){
- try{
- window.insert(r.nextInt());
- }
- catch(ConcurrentModificationException cme){
- org.junit.Assert.fail("State of window changed while processing");
- }
- }
- }
-
- }));
- }
- for(Thread thread : threads){
- thread.start();
- Thread.sleep(10);
- }
- for(Thread thread : threads)
- thread.join();
- }
-
-
- @Test
- public void noWaitConcurrentWindowAccessTest() throws InterruptedException {
- Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(100, tuple -> 0);
- window.registerPartitionProcessor((tuples, key) -> {});
- long finishTime = System.currentTimeMillis() + 3000;
-
- List<Thread> threads = new ArrayList<Thread>();
-
- // Ten threads concurrently attempt to insert tuples into the window
- for(int i = 0; i < 10; i++){
- threads.add(new Thread(new Runnable(){
- Random r = new Random();
- @Override
- public void run() {
- while(System.currentTimeMillis() < finishTime){
- try{
- window.insert(r.nextInt());
- }
- catch(ConcurrentModificationException cme){
- org.junit.Assert.fail("State of window changed while processing");
- }
- }
- }
-
- }));
- }
- for(Thread thread : threads){
- thread.start();
- Thread.sleep(10);
- }
- for(Thread thread : threads)
- thread.join();
- }
-
- @Test
- public void timeActionTest() throws InterruptedException {
- // Timing variances on shared machines can cause this test to fail
- assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
- List<Long> diffs = Collections.synchronizedList(new ArrayList<>());
- List<Boolean> initialized = Collections.synchronizedList(new ArrayList<>());
- initialized.add(false);
-
- Window<Long, Integer, InsertionTimeList<Long>> window = // new TimeWindowImpl<Long, Integer, LinkedList<Long>>(
- Windows.window(
- Policies.alwaysInsert(), // insertion policy
- Policies.scheduleEvictIfEmpty(1000, TimeUnit.MILLISECONDS),
- // Policies.evictOlderThan(1000, TimeUnit.MILLISECONDS),
- Policies.evictOlderWithProcess(1000, TimeUnit.MILLISECONDS),
- (partition, tuple) -> {
- if(initialized.get(0).booleanValue() == false){
- initialized.set(0, true);
- ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
- ses.scheduleAtFixedRate(() -> {partition.process();}, 0, 1000, TimeUnit.MILLISECONDS);
- }},
- unpartitioned(),
- () -> new InsertionTimeList<Long>());
-
- window.registerPartitionProcessor((tuples, key) -> {
- if(tuples.size() > 1)
- diffs.add(tuples.get(tuples.size()-1) - tuples.get(0));
- });
-
- window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
-
- long endTime = System.currentTimeMillis() + 8000;
- List<Thread> threads = new ArrayList<>();
- int NUM_THREADS = 10;
- // Create 10 threads. Each inserts at 1,000 Hz
- for(int i = 0; i < NUM_THREADS; i++){
- threads.add(new Thread(new Runnable() {
- @Override
- public void run() {
- while(System.currentTimeMillis() < endTime){
- window.insert(System.currentTimeMillis());
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }));
- }
- for(int i = 0; i < NUM_THREADS; i++){
- threads.get(i).start();
- }
- for(int i = 0; i < NUM_THREADS; i++){
- threads.get(i).join();
- }
- assertOnTimeEvictions(diffs);
-
- }
-
- @SuppressWarnings("serial")
- @Test
- public void countBatchWindowTest(){
- List<Integer> numBatches = new LinkedList<>();
- Window<Integer, Integer, List<Integer>> window =
- Windows.window(
- alwaysInsert(),
- Policies.doNothing(),
- Policies.evictAll(),
- Policies.processWhenFullAndEvict(113),
- tuple -> 0,
- () -> new ArrayList<Integer>());
- window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
- @Override
- public void accept(List<Integer> t, Integer u) {
- numBatches.add(1);
- }
- });
- for(int i = 0; i < 1000; i++){
- window.insert(i);
- }
-
- assertTrue(numBatches.size() == 8);
- }
-
- @SuppressWarnings("serial")
- @Test
- public void timeBatchWindowTest() throws InterruptedException{
- List<Long> numBatches = new LinkedList<>();
-
- ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
- Window<Integer, Integer, List<Integer>> window =
- Windows.window(
- alwaysInsert(),
- Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
- Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
- (partiton, tuple) -> {},
- tuple -> 0,
- () -> new ArrayList<Integer>());
-
- window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
- @Override
- public void accept(List<Integer> t, Integer u) {
- numBatches.add((long)t.size());
- }
- });
-
- window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
-
- ses.scheduleAtFixedRate(() -> {
- window.insert(1);
- }, 0, 10, TimeUnit.MILLISECONDS);
-
- Thread.sleep(11000);
- double tolerance = .08;
- for(int i = 0; i < numBatches.size(); i++){
- assertTrue("Values:" + numBatches.toString(), withinTolerance(100.0, numBatches.get(i).doubleValue(), tolerance));
- }
- }
-
- @SuppressWarnings("serial")
- @Test
- public void keyedTimeBatchWindowTest() throws InterruptedException{
- Map<Integer, List<Long> > numBatches = Collections.synchronizedMap(new HashMap<>());
- for(int i = 0; i < 5; i++)
- numBatches.put(i, new LinkedList<Long>());
-
- ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
- Window<Integer, Integer, List<Integer>> window =
- Windows.window(
- alwaysInsert(),
- Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
- Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
- (partiton, tuple) -> {},
- tuple -> tuple,
- () -> new ArrayList<Integer>());
-
- window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
- @Override
- public void accept(List<Integer> t, Integer u) {
- List<Long> l = numBatches.get(u);
- l.add((long)t.size());
- }
- });
-
- window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
-
- int[] count = {1};
- ses.scheduleAtFixedRate(() -> {
- window.insert(count[0]++ % 5);
- }, 0, 1, TimeUnit.MILLISECONDS);
-
- Thread.sleep(11000);
- double tolerance = .08;
-
- for(Integer key : numBatches.keySet()){
- List<Long> batch = numBatches.get(key);
- for(Long l : batch){
- assertTrue("Values:" + batch.toString(), withinTolerance(200.0, l.doubleValue(), tolerance));
- }
- }
- }
-
-
- @SuppressWarnings("serial")
- @Test
- public void timeBatchEnsureUnique() throws InterruptedException{
- List<List<Integer>> batches = Collections.synchronizedList(new LinkedList<>());
-
- ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
- Window<Integer, Integer, List<Integer>> window =
- Windows.window(
- alwaysInsert(),
- Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
- Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
- (partiton, tuple) -> {},
- tuple -> 0,
- () -> new ArrayList<Integer>());
-
- window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
- @Override
- public void accept(List<Integer> t, Integer u) {
- batches.add(new ArrayList<Integer>(t));
- }
- });
-
- window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
-
- ses.scheduleAtFixedRate(new Runnable(){
- private int count = 0;
- @Override
- public void run() {
- if(count < 1000){
- window.insert(count++);
- }
- }
-
- }, 0, 10, TimeUnit.MILLISECONDS);
-
- Thread.sleep(11000);
- int numTuples = 0;
- for(int i = 0; i < batches.size() - 1; i++){
- List<Integer> batch = batches.get(i);
- numTuples += batch.size();
- for(int j = i + 1; j < batches.size(); j++){
- assertTrue("Batches have overlapping tuples", Collections.disjoint(batches.get(i), batches.get(j)));
- }
- }
-
- numTuples += batches.get(batches.size() -1).size();
- assertTrue("Number of tuples submitted (1000) != number of tuples processed in batch (" + numTuples + ")", numTuples == 1000);
- }
-
- private void assertOnTimeEvictions(List<Long> diffs) {
- double tolerance = .08;
- for(int i = 1; i < diffs.size(); i++){
- assertTrue(withinTolerance(1000.0, diffs.get(i).doubleValue(), tolerance));
- }
-
- }
-
- public static boolean withinTolerance(double expected, Double actual, double tolerance) {
- double lowBound = (1.0 - tolerance) * expected;
- double highBound = (1.0 + tolerance) * expected;
- return (actual < highBound && actual > lowBound);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/test/java/org/apache/edgent/test/window/StateTest.java
----------------------------------------------------------------------
diff --git a/api/window/src/test/java/org/apache/edgent/test/window/StateTest.java b/api/window/src/test/java/org/apache/edgent/test/window/StateTest.java
new file mode 100644
index 0000000..f324152
--- /dev/null
+++ b/api/window/src/test/java/org/apache/edgent/test/window/StateTest.java
@@ -0,0 +1,101 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.window;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.window.PartitionedState;
+import org.junit.Test;
+
+
+public class StateTest {
+
+ /**
+ * Test PartitionedState with immutable state.
+ */
+ @Test
+ public void partitionedImmutableStateTest() {
+
+ TestState<Integer> state = new TestState<>(() -> 73);
+
+ assertEquals(73, state.getState("A").intValue());
+ assertEquals(73, state.getState("B").intValue());
+
+ assertEquals(73, state.removeState("A").intValue());
+ // and it reverts back to the initial value.
+ assertEquals(73, state.getState("A").intValue());
+
+ assertEquals(73, state.setState("B", 102).intValue());
+ assertEquals(102, state.getState("B").intValue());
+
+ assertEquals(73, state.getState("A").intValue());
+ }
+
+ /**
+ * Test PartitionedState with mutable state, basically
+ * checking that the state does not get confused.
+ */
+ @Test
+ public void partitionedMutableStateTest() {
+
+ TestState<int[]> state = new TestState<>(() -> new int[1]);
+
+ assertEquals(0, state.getState("A")[0]);
+ assertEquals(0, state.getState("B")[0]);
+
+ // change A, must not change B
+ state.getState("A")[0] = 73;
+ assertEquals(73, state.getState("A")[0]);
+ assertEquals(0, state.getState("B")[0]);
+
+ // change B, must not change A
+ state.getState("B")[0] = 102;
+ assertEquals(73, state.getState("A")[0]);
+ assertEquals(102, state.getState("B")[0]);
+
+ assertEquals(73, state.removeState("A")[0]);
+ assertEquals(0, state.getState("A")[0]);
+
+ int[] newB = new int[1];
+ newB[0] = 9214;
+ assertEquals(102, state.setState("B", newB)[0]);
+ assertEquals(9214, state.getState("B")[0]);
+ }
+
+
+ private static class TestState<S> extends PartitionedState<String, S> {
+
+ protected TestState(Supplier<S> initialState) {
+ super(initialState);
+ }
+ @Override
+ public S getState(String key) {
+ return super.getState(key);
+ }
+ @Override
+ public S removeState(String key) {
+ return super.removeState(key);
+ }
+ @Override
+ public S setState(String key, S state) {
+ return super.setState(key, state);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java
----------------------------------------------------------------------
diff --git a/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java b/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java
new file mode 100644
index 0000000..f075b07
--- /dev/null
+++ b/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java
@@ -0,0 +1,491 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.window;
+
+import static org.apache.edgent.function.Functions.unpartitioned;
+import static org.apache.edgent.window.Policies.alwaysInsert;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.window.InsertionTimeList;
+import org.apache.edgent.window.Policies;
+import org.apache.edgent.window.Window;
+import org.apache.edgent.window.Windows;
+import org.junit.Test;
+
+
+public class WindowTest {
+
+ /**
+ * Verifies that the state of the window is correct after each tuple offer.
+ */
+ @Test
+ public void lastCountTest(){
+ final int COUNT = 100;
+ // The window implementation
+ Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, unpartitioned());
+ // The states of the window as it slides
+ LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
+
+ // A processor that records the states of the window
+ BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
+ incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
+ };
+ window.registerPartitionProcessor(wp);
+
+ // Generate sliding window correct incremental state to compare
+ // against the window's
+ LinkedList<LinkedList<Integer>> correctWindowStates = new LinkedList<>();
+ LinkedList<Integer> previous = null;
+ LinkedList<Integer> current = null;
+ for(int i = 0; i < COUNT; i++){
+ current = new LinkedList<>();
+ if(previous != null)
+ current.addAll(previous);
+
+ current.addLast(i);
+ if(current.size() > 10){
+ current.removeFirst();
+ }
+ previous = current;
+ correctWindowStates.addLast(current);
+ }
+
+ // Add tuples to window, populating the incrementalWindowStates list.
+ for(int i = 0; i < COUNT; i++){
+ window.insert(i);
+ }
+
+ // Compare correct window states to the window implementation's
+ assertTrue(correctWindowStates.size() == incrementalWindowStates.size());
+ for(int i = 0; i < correctWindowStates.size(); i++){
+ assertTrue(correctWindowStates.get(i).containsAll(incrementalWindowStates.get(i)));
+ assertTrue(incrementalWindowStates.get(i).containsAll(correctWindowStates.get(i)));
+ }
+ }
+
+ @Test
+ public void keyedWindowTest(){
+ final int COUNT = 1000;
+ // The window implementation
+ // The window implementation
+ Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, tuple->tuple%10);
+
+
+ // The states of the window as it slides
+ LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
+
+ // A processor that records the states of the window
+ BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
+ incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
+ };
+ window.registerPartitionProcessor(wp);
+
+ Map<Integer, LinkedList<Integer>> correctPartitionedStates = new HashMap<>();
+ List<List<Integer> > correctWindowStates = new ArrayList<>();
+ for(int i = 0; i < 10; i++){
+ correctPartitionedStates.put(i, new LinkedList<>());
+ }
+ for(int i = 0; i < COUNT; i++){
+ correctPartitionedStates.get(i%10).add(i);
+ if(correctPartitionedStates.get(i%10).size() > 10){
+ correctPartitionedStates.get(i%10).removeFirst();
+ }
+ correctWindowStates.add(new ArrayList<>(correctPartitionedStates.get(i%10)));
+ window.insert(i);
+ }
+
+ // Compare correct window states to the window implementation's
+ assertTrue(correctWindowStates.size() == incrementalWindowStates.size());
+ for(int i = 0; i < correctWindowStates.size(); i++){
+ assertTrue(correctWindowStates.get(i).containsAll(incrementalWindowStates.get(i)));
+ assertTrue(incrementalWindowStates.get(i).containsAll(correctWindowStates.get(i)));
+ }
+ }
+
+ @Test
+ public void accessPartitionKeyTest(){
+ LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
+
+ Window<Integer, Integer, ? extends List<Integer>> window = Windows.<Integer, Integer, LinkedList<Integer>>window(
+ (partition, tuple) -> {
+ if (partition.getKey().equals(1) || partition.getKey().equals(3)) {
+ return false;
+ }
+ return true;
+ },
+ (partition, tuple) -> { // Contents policy
+
+ },
+ (partition) -> { // Evict determiner
+ partition.getContents().clear();
+ },
+ Policies.processOnInsert(),
+ tuple -> tuple,
+ () -> new LinkedList<Integer>());
+
+ // A processor that records the states of the window
+ BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
+ incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
+ };
+ window.registerPartitionProcessor(wp);
+
+
+ for(Integer i = 0; i < 5; i++){
+ window.insert(i);
+ }
+
+ assertTrue(incrementalWindowStates.size() == 3);
+ assertTrue(incrementalWindowStates.get(0).get(0)==0);
+ assertTrue(incrementalWindowStates.get(1).get(0)==2);
+ assertTrue(incrementalWindowStates.get(2).get(0)==4);
+
+ }
+
+ @Test
+ public void concurrentWindowAccessTest() throws InterruptedException {
+
+ Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, tuple -> 0);
+
+ window.registerPartitionProcessor((tuples, key) -> {
+ // ensure that the window state doesn't change after .05 seconds
+ // Copy window state
+ LinkedList<Integer> list_copy = new LinkedList<Integer>(tuples);
+
+ // Wait .05 seconds
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // Verify that the partition is unchanged
+ assertTrue(list_copy.containsAll(tuples));
+ assertTrue(tuples.containsAll(list_copy));
+ });
+
+
+ // Run for five seconds.
+ long finishTime = System.currentTimeMillis() + 3000;
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+ // Ten threads concurrently attempt to insert tuples into the window
+ for(int i = 0; i < 10; i++){
+ threads.add(new Thread(new Runnable(){
+ Random r = new Random();
+ @Override
+ public void run() {
+
+ while(System.currentTimeMillis() < finishTime){
+ try{
+ window.insert(r.nextInt());
+ }
+ catch(ConcurrentModificationException cme){
+ org.junit.Assert.fail("State of window changed while processing");
+ }
+ }
+ }
+
+ }));
+ }
+ for(Thread thread : threads){
+ thread.start();
+ Thread.sleep(10);
+ }
+ for(Thread thread : threads)
+ thread.join();
+ }
+
+
+ @Test
+ public void noWaitConcurrentWindowAccessTest() throws InterruptedException {
+ Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(100, tuple -> 0);
+ window.registerPartitionProcessor((tuples, key) -> {});
+ long finishTime = System.currentTimeMillis() + 3000;
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+ // Ten threads concurrently attempt to insert tuples into the window
+ for(int i = 0; i < 10; i++){
+ threads.add(new Thread(new Runnable(){
+ Random r = new Random();
+ @Override
+ public void run() {
+ while(System.currentTimeMillis() < finishTime){
+ try{
+ window.insert(r.nextInt());
+ }
+ catch(ConcurrentModificationException cme){
+ org.junit.Assert.fail("State of window changed while processing");
+ }
+ }
+ }
+
+ }));
+ }
+ for(Thread thread : threads){
+ thread.start();
+ Thread.sleep(10);
+ }
+ for(Thread thread : threads)
+ thread.join();
+ }
+
+ @Test
+ public void timeActionTest() throws InterruptedException {
+ // Timing variances on shared machines can cause this test to fail
+ assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
+
+ List<Long> diffs = Collections.synchronizedList(new ArrayList<>());
+ List<Boolean> initialized = Collections.synchronizedList(new ArrayList<>());
+ initialized.add(false);
+
+ Window<Long, Integer, InsertionTimeList<Long>> window = // new TimeWindowImpl<Long, Integer, LinkedList<Long>>(
+ Windows.window(
+ Policies.alwaysInsert(), // insertion policy
+ Policies.scheduleEvictIfEmpty(1000, TimeUnit.MILLISECONDS),
+ // Policies.evictOlderThan(1000, TimeUnit.MILLISECONDS),
+ Policies.evictOlderWithProcess(1000, TimeUnit.MILLISECONDS),
+ (partition, tuple) -> {
+ if(initialized.get(0).booleanValue() == false){
+ initialized.set(0, true);
+ ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
+ ses.scheduleAtFixedRate(() -> {partition.process();}, 0, 1000, TimeUnit.MILLISECONDS);
+ }},
+ unpartitioned(),
+ () -> new InsertionTimeList<Long>());
+
+ window.registerPartitionProcessor((tuples, key) -> {
+ if(tuples.size() > 1)
+ diffs.add(tuples.get(tuples.size()-1) - tuples.get(0));
+ });
+
+ window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
+
+ long endTime = System.currentTimeMillis() + 8000;
+ List<Thread> threads = new ArrayList<>();
+ int NUM_THREADS = 10;
+ // Create 10 threads. Each inserts at 1,000 Hz
+ for(int i = 0; i < NUM_THREADS; i++){
+ threads.add(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while(System.currentTimeMillis() < endTime){
+ window.insert(System.currentTimeMillis());
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }));
+ }
+ for(int i = 0; i < NUM_THREADS; i++){
+ threads.get(i).start();
+ }
+ for(int i = 0; i < NUM_THREADS; i++){
+ threads.get(i).join();
+ }
+ assertOnTimeEvictions(diffs);
+
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void countBatchWindowTest(){
+ List<Integer> numBatches = new LinkedList<>();
+ Window<Integer, Integer, List<Integer>> window =
+ Windows.window(
+ alwaysInsert(),
+ Policies.doNothing(),
+ Policies.evictAll(),
+ Policies.processWhenFullAndEvict(113),
+ tuple -> 0,
+ () -> new ArrayList<Integer>());
+ window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
+ @Override
+ public void accept(List<Integer> t, Integer u) {
+ numBatches.add(1);
+ }
+ });
+ for(int i = 0; i < 1000; i++){
+ window.insert(i);
+ }
+
+ assertTrue(numBatches.size() == 8);
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void timeBatchWindowTest() throws InterruptedException{
+ List<Long> numBatches = new LinkedList<>();
+
+ ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
+ Window<Integer, Integer, List<Integer>> window =
+ Windows.window(
+ alwaysInsert(),
+ Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
+ Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
+ (partiton, tuple) -> {},
+ tuple -> 0,
+ () -> new ArrayList<Integer>());
+
+ window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
+ @Override
+ public void accept(List<Integer> t, Integer u) {
+ numBatches.add((long)t.size());
+ }
+ });
+
+ window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
+
+ ses.scheduleAtFixedRate(() -> {
+ window.insert(1);
+ }, 0, 10, TimeUnit.MILLISECONDS);
+
+ Thread.sleep(11000);
+ double tolerance = .08;
+ for(int i = 0; i < numBatches.size(); i++){
+ assertTrue("Values:" + numBatches.toString(), withinTolerance(100.0, numBatches.get(i).doubleValue(), tolerance));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void keyedTimeBatchWindowTest() throws InterruptedException{
+ Map<Integer, List<Long> > numBatches = Collections.synchronizedMap(new HashMap<>());
+ for(int i = 0; i < 5; i++)
+ numBatches.put(i, new LinkedList<Long>());
+
+ ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
+ Window<Integer, Integer, List<Integer>> window =
+ Windows.window(
+ alwaysInsert(),
+ Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
+ Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
+ (partiton, tuple) -> {},
+ tuple -> tuple,
+ () -> new ArrayList<Integer>());
+
+ window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
+ @Override
+ public void accept(List<Integer> t, Integer u) {
+ List<Long> l = numBatches.get(u);
+ l.add((long)t.size());
+ }
+ });
+
+ window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
+
+ int[] count = {1};
+ ses.scheduleAtFixedRate(() -> {
+ window.insert(count[0]++ % 5);
+ }, 0, 1, TimeUnit.MILLISECONDS);
+
+ Thread.sleep(11000);
+ double tolerance = .08;
+
+ for(Integer key : numBatches.keySet()){
+ List<Long> batch = numBatches.get(key);
+ for(Long l : batch){
+ assertTrue("Values:" + batch.toString(), withinTolerance(200.0, l.doubleValue(), tolerance));
+ }
+ }
+ }
+
+
+ @SuppressWarnings("serial")
+ @Test
+ public void timeBatchEnsureUnique() throws InterruptedException{
+ List<List<Integer>> batches = Collections.synchronizedList(new LinkedList<>());
+
+ ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
+ Window<Integer, Integer, List<Integer>> window =
+ Windows.window(
+ alwaysInsert(),
+ Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
+ Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
+ (partiton, tuple) -> {},
+ tuple -> 0,
+ () -> new ArrayList<Integer>());
+
+ window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
+ @Override
+ public void accept(List<Integer> t, Integer u) {
+ batches.add(new ArrayList<Integer>(t));
+ }
+ });
+
+ window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
+
+ ses.scheduleAtFixedRate(new Runnable(){
+ private int count = 0;
+ @Override
+ public void run() {
+ if(count < 1000){
+ window.insert(count++);
+ }
+ }
+
+ }, 0, 10, TimeUnit.MILLISECONDS);
+
+ Thread.sleep(11000);
+ int numTuples = 0;
+ for(int i = 0; i < batches.size() - 1; i++){
+ List<Integer> batch = batches.get(i);
+ numTuples += batch.size();
+ for(int j = i + 1; j < batches.size(); j++){
+ assertTrue("Batches have overlapping tuples", Collections.disjoint(batches.get(i), batches.get(j)));
+ }
+ }
+
+ numTuples += batches.get(batches.size() -1).size();
+ assertTrue("Number of tuples submitted (1000) != number of tuples processed in batch (" + numTuples + ")", numTuples == 1000);
+ }
+
+ private void assertOnTimeEvictions(List<Long> diffs) {
+ double tolerance = .08;
+ for(int i = 1; i < diffs.size(); i++){
+ assertTrue(withinTolerance(1000.0, diffs.get(i).doubleValue(), tolerance));
+ }
+
+ }
+
+ public static boolean withinTolerance(double expected, Double actual, double tolerance) {
+ double lowBound = (1.0 - tolerance) * expected;
+ double highBound = (1.0 + tolerance) * expected;
+ return (actual < highBound && actual > lowBound);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/edgent/apps/iot/IotDevicePubSub.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/edgent/apps/iot/IotDevicePubSub.java b/apps/iot/src/main/java/edgent/apps/iot/IotDevicePubSub.java
deleted file mode 100644
index c7f8fea..0000000
--- a/apps/iot/src/main/java/edgent/apps/iot/IotDevicePubSub.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.apps.iot;
-
-import static edgent.topology.services.ApplicationService.SYSTEM_APP_PREFIX;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.iot.Events;
-import edgent.connectors.iot.IotDevice;
-import edgent.connectors.iot.QoS;
-import edgent.connectors.pubsub.PublishSubscribe;
-import edgent.topology.TStream;
-import edgent.topology.TopologyElement;
-
-/**
- * Application sharing an {@code IotDevice} through publish-subscribe. <BR>
- * This application shares an {@link IotDevice} across multiple running
- * jobs. This allows a single connection to a back-end message hub to be shared
- * across multiple independent applications, without having to build a single
- * topology.
- * <P>
- * Applications coded to {@link IotDevice} obtain a topology specific
- * {@code IotDevice} using {@link #addIotDevice(TopologyElement)}. This returned
- * device will route events and commands to/from the actual message hub
- * {@code IotDevice} through publish-subscribe.
- * <P>
- * An instance of this application is created by first creating a new topology and
- * then creating a {@link IotDevice} specific to the desired message hub. Then
- * the application is created by calling {@link #createApplication(IotDevice)}
- * passing the {@code IotDevice}. <BR>
- * Then additional independent applications (topologies) can be created and they
- * create a proxy {@code IotDevice} for their topology using
- * {@link #addIotDevice(TopologyElement)}. This proxy {@code IotDevice} is then
- * used to send device events and receive device commands in that topology. <BR>
- * Once all the topologies have been declared they can be submitted.
- * </P>
- * <P>
- * At startup this application sends a single device event with
- * identifier {@link Events#IOT_START}. This performs two functions:
- * </P>
- * <UL>
- * <LI>Initiates a connection to the message hub.</LI>
- * <LI>Allows external applications to be notified (by subscribing to device events)
- * when an Edgent provider starts.</LI>
- * </UL>
- *
- * @see PublishSubscribe
- */
-public class IotDevicePubSub {
-
- /**
- * IotDevicePubSub application name.
- */
- public static final String APP_NAME = SYSTEM_APP_PREFIX + "IotDevicePubSub";
-
- /**
- * Events published to topic {@value} are sent as device events using the
- * actual message hub {@link IotDevice}. <BR>
- * it is recommended applications use the {@code IotDevice} returned by
- * {@link #addIotDevice(TopologyElement)} to send events rather than
- * publishing streams to this topic.
- */
- public static final String EVENTS_TOPIC = PublishSubscribe.RESERVED_TOPIC_PREFIX + "iot/events";
-
- /**
- * Device commands are published to {@value} by
- * this application. <BR>
- * it is recommended applications use the {@code IotDevice} returned by
- * {@link #addIotDevice(TopologyElement)} to receive commands rather than
- * subscribing to streams with this topic prefix.
- */
- public static final String COMMANDS_TOPIC = PublishSubscribe.RESERVED_TOPIC_PREFIX + "iot/commands";
-
- /**
- * Create an instance of this application using {@code device} as the device
- * connection to a message hub.
- *
- * @param device the IotDevice
- */
- public static void createApplication(IotDevice device) {
-
- TStream<JsonObject> events = PublishSubscribe.subscribe(device, EVENTS_TOPIC, JsonObject.class);
-
- device.events(events, ew -> ew.getAsJsonPrimitive("eventId").getAsString(), ew -> ew.getAsJsonObject("event"),
- ew -> ew.getAsJsonPrimitive("qos").getAsInt());
-
- PublishSubscribe.publish(device.commands(), COMMANDS_TOPIC, JsonObject.class);
-
- // Publish a single event at startup
- TStream<JsonObject> start = device.topology().of(new JsonObject());
- device.events(start, Events.IOT_START, QoS.AT_MOST_ONCE);
- }
-
- /**
- * Add a proxy {@code IotDevice} for the topology containing {@code te}.
- * <P>
- * Any events sent through the returned device are sent onto the message hub
- * device through publish-subscribe. <BR>
- * Subscribing to commands using the returned device will subscribe to
- * commands received by the message hub device.
- * </P>
- *
- * @param te
- * Topology the returned device is contained in.
- * @return Proxy device.
- */
- public static IotDevice addIotDevice(TopologyElement te) {
- return new PubSubIotDevice(te.topology());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/edgent/apps/iot/PubSubIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/edgent/apps/iot/PubSubIotDevice.java b/apps/iot/src/main/java/edgent/apps/iot/PubSubIotDevice.java
deleted file mode 100644
index 61af093..0000000
--- a/apps/iot/src/main/java/edgent/apps/iot/PubSubIotDevice.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.apps.iot;
-
-import static edgent.apps.iot.IotDevicePubSub.COMMANDS_TOPIC;
-import static edgent.apps.iot.IotDevicePubSub.EVENTS_TOPIC;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.iot.IotDevice;
-import edgent.connectors.pubsub.PublishSubscribe;
-import edgent.function.Function;
-import edgent.function.UnaryOperator;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-
-
-/**
- * Pub-sub IotDevice that uses publish-subscribe and IotDevicePubSub application
- * to communicate with a single IotDevice connected to a message hub.
- */
-class PubSubIotDevice implements IotDevice {
-
- private final Topology topology;
-
- /**
- * Create a proxy IotDevice
- *
- * @param app
- * IotDevicePubSub application hosting the actual IotDevice.
- * @param topology
- * Topology of the subscribing application.
- */
- PubSubIotDevice(Topology topology) {
- this.topology = topology;
- }
-
- @Override
- public final Topology topology() {
- return topology;
- }
-
- /**
- * Publishes events derived from {@code stream} using the topic
- * {@link EVENTS} as a JsonObject containing eventId, event,
- * and qos keys.
- */
- @Override
- public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
- UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
-
- stream = stream.map(event -> {
- JsonObject publishedEvent = new JsonObject();
-
- publishedEvent.addProperty("eventId", eventId.apply(event));
- publishedEvent.add("event", payload.apply(event));
- publishedEvent.addProperty("qos", qos.apply(event));
-
- return publishedEvent;
- });
-
- return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS_TOPIC, JsonObject.class);
- }
-
- /**
- * Publishes events derived from {@code stream} using the topic
- * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
- * and qos keys.
- */
- @Override
- public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
-
- stream = stream.map(event -> {
- JsonObject publishedEvent = new JsonObject();
-
- publishedEvent.addProperty("eventId", eventId);
- publishedEvent.add("event", event);
- publishedEvent.addProperty("qos", qos);
-
- return publishedEvent;
- });
-
- return PublishSubscribe.publish(stream, EVENTS_TOPIC, JsonObject.class);
- }
-
- /**
- * Subscribes to commands.
- */
- @Override
- public TStream<JsonObject> commands(String... commandIdentifiers) {
-
- TStream<JsonObject> commandsStream = PublishSubscribe.subscribe(this, COMMANDS_TOPIC, JsonObject.class);
-
- if (commandIdentifiers.length > 0) {
- Set<String> cmdIds = new HashSet<>(Arrays.asList(commandIdentifiers));
- commandsStream = commandsStream.filter(
- cmd -> cmdIds.contains(cmd.get(CMD_ID).getAsString()));
- }
-
- return commandsStream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/edgent/apps/iot/package-info.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/edgent/apps/iot/package-info.java b/apps/iot/src/main/java/edgent/apps/iot/package-info.java
deleted file mode 100644
index cecb456..0000000
--- a/apps/iot/src/main/java/edgent/apps/iot/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Applications for use in an Internet of Things environment.
- */
-package edgent.apps.iot;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/org/apache/edgent/apps/iot/IotDevicePubSub.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/org/apache/edgent/apps/iot/IotDevicePubSub.java b/apps/iot/src/main/java/org/apache/edgent/apps/iot/IotDevicePubSub.java
new file mode 100644
index 0000000..075fd89
--- /dev/null
+++ b/apps/iot/src/main/java/org/apache/edgent/apps/iot/IotDevicePubSub.java
@@ -0,0 +1,128 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.apps.iot;
+
+import static org.apache.edgent.topology.services.ApplicationService.SYSTEM_APP_PREFIX;
+
+import org.apache.edgent.connectors.iot.Events;
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.connectors.pubsub.PublishSubscribe;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TopologyElement;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Application sharing an {@code IotDevice} through publish-subscribe. <BR>
+ * This application shares an {@link IotDevice} across multiple running
+ * jobs. This allows a single connection to a back-end message hub to be shared
+ * across multiple independent applications, without having to build a single
+ * topology.
+ * <P>
+ * Applications coded to {@link IotDevice} obtain a topology specific
+ * {@code IotDevice} using {@link #addIotDevice(TopologyElement)}. This returned
+ * device will route events and commands to/from the actual message hub
+ * {@code IotDevice} through publish-subscribe.
+ * <P>
+ * An instance of this application is created by first creating a new topology and
+ * then creating a {@link IotDevice} specific to the desired message hub. Then
+ * the application is created by calling {@link #createApplication(IotDevice)}
+ * passing the {@code IotDevice}. <BR>
+ * Then additional independent applications (topologies) can be created and they
+ * create a proxy {@code IotDevice} for their topology using
+ * {@link #addIotDevice(TopologyElement)}. This proxy {@code IotDevice} is then
+ * used to send device events and receive device commands in that topology. <BR>
+ * Once all the topologies have been declared they can be submitted.
+ * </P>
+ * <P>
+ * At startup this application sends a single device event with
+ * identifier {@link Events#IOT_START}. This performs two functions:
+ * </P>
+ * <UL>
+ * <LI>Initiates a connection to the message hub.</LI>
+ * <LI>Allows external applications to be notified (by subscribing to device events)
+ * when an Edgent provider starts.</LI>
+ * </UL>
+ *
+ * @see PublishSubscribe
+ */
+public class IotDevicePubSub {
+
+ /**
+ * IotDevicePubSub application name.
+ */
+ public static final String APP_NAME = SYSTEM_APP_PREFIX + "IotDevicePubSub";
+
+ /**
+ * Events published to topic {@value} are sent as device events using the
+ * actual message hub {@link IotDevice}. <BR>
+ * it is recommended applications use the {@code IotDevice} returned by
+ * {@link #addIotDevice(TopologyElement)} to send events rather than
+ * publishing streams to this topic.
+ */
+ public static final String EVENTS_TOPIC = PublishSubscribe.RESERVED_TOPIC_PREFIX + "iot/events";
+
+ /**
+ * Device commands are published to {@value} by
+ * this application. <BR>
+ * it is recommended applications use the {@code IotDevice} returned by
+ * {@link #addIotDevice(TopologyElement)} to receive commands rather than
+ * subscribing to streams with this topic prefix.
+ */
+ public static final String COMMANDS_TOPIC = PublishSubscribe.RESERVED_TOPIC_PREFIX + "iot/commands";
+
+ /**
+ * Create an instance of this application using {@code device} as the device
+ * connection to a message hub.
+ *
+ * @param device the IotDevice
+ */
+ public static void createApplication(IotDevice device) {
+
+ TStream<JsonObject> events = PublishSubscribe.subscribe(device, EVENTS_TOPIC, JsonObject.class);
+
+ device.events(events, ew -> ew.getAsJsonPrimitive("eventId").getAsString(), ew -> ew.getAsJsonObject("event"),
+ ew -> ew.getAsJsonPrimitive("qos").getAsInt());
+
+ PublishSubscribe.publish(device.commands(), COMMANDS_TOPIC, JsonObject.class);
+
+ // Publish a single event at startup
+ TStream<JsonObject> start = device.topology().of(new JsonObject());
+ device.events(start, Events.IOT_START, QoS.AT_MOST_ONCE);
+ }
+
+ /**
+ * Add a proxy {@code IotDevice} for the topology containing {@code te}.
+ * <P>
+ * Any events sent through the returned device are sent onto the message hub
+ * device through publish-subscribe. <BR>
+ * Subscribing to commands using the returned device will subscribe to
+ * commands received by the message hub device.
+ * </P>
+ *
+ * @param te
+ * Topology the returned device is contained in.
+ * @return Proxy device.
+ */
+ public static IotDevice addIotDevice(TopologyElement te) {
+ return new PubSubIotDevice(te.topology());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java b/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java
new file mode 100644
index 0000000..0110a01
--- /dev/null
+++ b/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java
@@ -0,0 +1,124 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.apps.iot;
+
+import static org.apache.edgent.apps.iot.IotDevicePubSub.COMMANDS_TOPIC;
+import static org.apache.edgent.apps.iot.IotDevicePubSub.EVENTS_TOPIC;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.connectors.pubsub.PublishSubscribe;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.UnaryOperator;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import com.google.gson.JsonObject;
+
+
+/**
+ * Pub-sub IotDevice that uses publish-subscribe and IotDevicePubSub application
+ * to communicate with a single IotDevice connected to a message hub.
+ */
+class PubSubIotDevice implements IotDevice {
+
+ private final Topology topology;
+
+ /**
+ * Create a proxy IotDevice
+ *
+ * @param app
+ * IotDevicePubSub application hosting the actual IotDevice.
+ * @param topology
+ * Topology of the subscribing application.
+ */
+ PubSubIotDevice(Topology topology) {
+ this.topology = topology;
+ }
+
+ @Override
+ public final Topology topology() {
+ return topology;
+ }
+
+ /**
+ * Publishes events derived from {@code stream} using the topic
+ * {@link EVENTS} as a JsonObject containing eventId, event,
+ * and qos keys.
+ */
+ @Override
+ public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+ UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
+
+ stream = stream.map(event -> {
+ JsonObject publishedEvent = new JsonObject();
+
+ publishedEvent.addProperty("eventId", eventId.apply(event));
+ publishedEvent.add("event", payload.apply(event));
+ publishedEvent.addProperty("qos", qos.apply(event));
+
+ return publishedEvent;
+ });
+
+ return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS_TOPIC, JsonObject.class);
+ }
+
+ /**
+ * Publishes events derived from {@code stream} using the topic
+ * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
+ * and qos keys.
+ */
+ @Override
+ public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
+
+ stream = stream.map(event -> {
+ JsonObject publishedEvent = new JsonObject();
+
+ publishedEvent.addProperty("eventId", eventId);
+ publishedEvent.add("event", event);
+ publishedEvent.addProperty("qos", qos);
+
+ return publishedEvent;
+ });
+
+ return PublishSubscribe.publish(stream, EVENTS_TOPIC, JsonObject.class);
+ }
+
+ /**
+ * Subscribes to commands.
+ */
+ @Override
+ public TStream<JsonObject> commands(String... commandIdentifiers) {
+
+ TStream<JsonObject> commandsStream = PublishSubscribe.subscribe(this, COMMANDS_TOPIC, JsonObject.class);
+
+ if (commandIdentifiers.length > 0) {
+ Set<String> cmdIds = new HashSet<>(Arrays.asList(commandIdentifiers));
+ commandsStream = commandsStream.filter(
+ cmd -> cmdIds.contains(cmd.get(CMD_ID).getAsString()));
+ }
+
+ return commandsStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/main/java/org/apache/edgent/apps/iot/package-info.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/org/apache/edgent/apps/iot/package-info.java b/apps/iot/src/main/java/org/apache/edgent/apps/iot/package-info.java
new file mode 100644
index 0000000..c6aa983
--- /dev/null
+++ b/apps/iot/src/main/java/org/apache/edgent/apps/iot/package-info.java
@@ -0,0 +1,23 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Applications for use in an Internet of Things environment.
+ */
+package org.apache.edgent.apps.iot;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/test/java/edgent/test/apps/iot/EchoIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/edgent/test/apps/iot/EchoIotDevice.java b/apps/iot/src/test/java/edgent/test/apps/iot/EchoIotDevice.java
deleted file mode 100644
index c81edcc..0000000
--- a/apps/iot/src/test/java/edgent/test/apps/iot/EchoIotDevice.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.test.apps.iot;
-
-import static edgent.function.Functions.discard;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.iot.IotDevice;
-import edgent.function.Function;
-import edgent.function.UnaryOperator;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-
-/**
- * A test IotDevice that echos back every event as a command with command
- * identifier equal to the {@code cmdId} value in the event payload. If {@code cmdId}
- * is not set then the event identifier is used.
- *
- */
-public class EchoIotDevice implements IotDevice {
-
- public static final String EVENT_CMD_ID = "cmdId";
-
- private final Topology topology;
- private TStream<JsonObject> echoCmds;
-
- public EchoIotDevice(Topology topology) {
- this.topology = topology;
- }
-
- @Override
- public Topology topology() {
- return topology;
- }
-
- @Override
- public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
- UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
-
- stream = stream.map(e -> {
- JsonObject c = new JsonObject();
- JsonObject evPayload = payload.apply(e);
- c.addProperty(CMD_ID, getCommandIdFromEvent(eventId.apply(e), evPayload));
- c.add(CMD_PAYLOAD, evPayload);
- c.addProperty(CMD_FORMAT, "json");
- c.addProperty(CMD_TS, System.currentTimeMillis());
- return c;
- });
-
- return handleEvents(stream);
- }
-
- private static String getCommandIdFromEvent(String eventId, JsonObject evPayload) {
- if (evPayload.has(EVENT_CMD_ID))
- return evPayload.getAsJsonPrimitive(EVENT_CMD_ID).getAsString();
- else
- return eventId;
- }
-
- @Override
- public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
-
- stream = stream.map(e -> {
- JsonObject c = new JsonObject();
- c.addProperty(CMD_ID, getCommandIdFromEvent(eventId, e));
- c.add(CMD_PAYLOAD, e);
- c.addProperty(CMD_FORMAT, "json");
- c.addProperty(CMD_TS, System.currentTimeMillis());
- return c;
- });
-
- return handleEvents(stream);
- }
-
- private TSink<JsonObject> handleEvents(TStream<JsonObject> stream) {
-
- if (echoCmds == null)
- echoCmds = PlumbingStreams.isolate(stream, true);
- else
- echoCmds = PlumbingStreams.isolate(stream.union(echoCmds), true);
-
- return stream.sink(discard());
- }
-
- @Override
- public TStream<JsonObject> commands(String... commands) {
- if (commands.length == 0)
- return echoCmds;
-
- Set<String> cmds = new HashSet<>(Arrays.asList(commands));
- return echoCmds.filter(cmd -> cmds.contains(cmd.getAsJsonPrimitive(CMD_ID).getAsString()));
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/test/java/edgent/test/apps/iot/IotDevicePubSubTest.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/edgent/test/apps/iot/IotDevicePubSubTest.java b/apps/iot/src/test/java/edgent/test/apps/iot/IotDevicePubSubTest.java
deleted file mode 100644
index f92b425..0000000
--- a/apps/iot/src/test/java/edgent/test/apps/iot/IotDevicePubSubTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.test.apps.iot;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
-import edgent.apps.iot.IotDevicePubSub;
-import edgent.connectors.iot.IotDevice;
-import edgent.connectors.iot.QoS;
-import edgent.connectors.pubsub.service.ProviderPubSub;
-import edgent.connectors.pubsub.service.PublishSubscribeService;
-import edgent.execution.Job;
-import edgent.execution.Job.Action;
-import edgent.providers.direct.DirectProvider;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-import edgent.topology.tester.Condition;
-
-public class IotDevicePubSubTest {
-
-
-
- @Test
- public void testIotDevicePubSubApp() throws Exception {
- DirectProvider dp = new DirectProvider();
-
- dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
-
- Topology iot = dp.newTopology("IotPubSub");
- IotDevicePubSub.createApplication(new EchoIotDevice(iot));
-
- Topology app1 = dp.newTopology("App1");
-
- IotDevice app1Iot = IotDevicePubSub.addIotDevice(app1);
-
- TStream<String> data = app1.strings("A", "B", "C");
-
- // Without this the tuple can be published and discarded before the
- // subscriber is hooked up.
- data = PlumbingStreams.blockingOneShotDelay(data, 500, TimeUnit.MILLISECONDS);
-
- TStream<JsonObject> events = data.map(
- s -> {JsonObject j = new JsonObject(); j.addProperty("v", s); return j;});
- app1Iot.events(events, "ps1", QoS.FIRE_AND_FORGET);
-
- TStream<JsonObject> echoedCmds = app1Iot.commands("ps1");
-
- TStream<String> ecs = echoedCmds.map(j -> j.getAsJsonObject(IotDevice.CMD_PAYLOAD).getAsJsonPrimitive("v").getAsString());
- Condition<List<String>> tcEcho = app1.getTester().streamContents(ecs, "A", "B", "C"); // Expect all tuples
-
- Job jIot = dp.submit(iot.topology()).get();
- Job jApp = dp.submit(app1.topology()).get();
-
- for (int i = 0; i < 50 && !tcEcho.valid(); i++) {
- Thread.sleep(50);
- }
-
- assertTrue(tcEcho.getResult().toString(), tcEcho.valid());
-
- jIot.stateChange(Action.CLOSE);
- jApp.stateChange(Action.CLOSE);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java b/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java
new file mode 100644
index 0000000..65b217f
--- /dev/null
+++ b/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.apps.iot;
+
+import static org.apache.edgent.function.Functions.discard;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.UnaryOperator;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+
+import com.google.gson.JsonObject;
+
+/**
+ * A test IotDevice that echos back every event as a command with command
+ * identifier equal to the {@code cmdId} value in the event payload. If {@code cmdId}
+ * is not set then the event identifier is used.
+ *
+ */
+public class EchoIotDevice implements IotDevice {
+
+ public static final String EVENT_CMD_ID = "cmdId";
+
+ private final Topology topology;
+ private TStream<JsonObject> echoCmds;
+
+ public EchoIotDevice(Topology topology) {
+ this.topology = topology;
+ }
+
+ @Override
+ public Topology topology() {
+ return topology;
+ }
+
+ @Override
+ public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+ UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
+
+ stream = stream.map(e -> {
+ JsonObject c = new JsonObject();
+ JsonObject evPayload = payload.apply(e);
+ c.addProperty(CMD_ID, getCommandIdFromEvent(eventId.apply(e), evPayload));
+ c.add(CMD_PAYLOAD, evPayload);
+ c.addProperty(CMD_FORMAT, "json");
+ c.addProperty(CMD_TS, System.currentTimeMillis());
+ return c;
+ });
+
+ return handleEvents(stream);
+ }
+
+ private static String getCommandIdFromEvent(String eventId, JsonObject evPayload) {
+ if (evPayload.has(EVENT_CMD_ID))
+ return evPayload.getAsJsonPrimitive(EVENT_CMD_ID).getAsString();
+ else
+ return eventId;
+ }
+
+ @Override
+ public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
+
+ stream = stream.map(e -> {
+ JsonObject c = new JsonObject();
+ c.addProperty(CMD_ID, getCommandIdFromEvent(eventId, e));
+ c.add(CMD_PAYLOAD, e);
+ c.addProperty(CMD_FORMAT, "json");
+ c.addProperty(CMD_TS, System.currentTimeMillis());
+ return c;
+ });
+
+ return handleEvents(stream);
+ }
+
+ private TSink<JsonObject> handleEvents(TStream<JsonObject> stream) {
+
+ if (echoCmds == null)
+ echoCmds = PlumbingStreams.isolate(stream, true);
+ else
+ echoCmds = PlumbingStreams.isolate(stream.union(echoCmds), true);
+
+ return stream.sink(discard());
+ }
+
+ @Override
+ public TStream<JsonObject> commands(String... commands) {
+ if (commands.length == 0)
+ return echoCmds;
+
+ Set<String> cmds = new HashSet<>(Arrays.asList(commands));
+ return echoCmds.filter(cmd -> cmds.contains(cmd.getAsJsonPrimitive(CMD_ID).getAsString()));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/IotDevicePubSubTest.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/IotDevicePubSubTest.java b/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/IotDevicePubSubTest.java
new file mode 100644
index 0000000..f34de4f
--- /dev/null
+++ b/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/IotDevicePubSubTest.java
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.apps.iot;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.apps.iot.IotDevicePubSub;
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.connectors.pubsub.service.ProviderPubSub;
+import org.apache.edgent.connectors.pubsub.service.PublishSubscribeService;
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Job.Action;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+public class IotDevicePubSubTest {
+
+
+
+ @Test
+ public void testIotDevicePubSubApp() throws Exception {
+ DirectProvider dp = new DirectProvider();
+
+ dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
+
+ Topology iot = dp.newTopology("IotPubSub");
+ IotDevicePubSub.createApplication(new EchoIotDevice(iot));
+
+ Topology app1 = dp.newTopology("App1");
+
+ IotDevice app1Iot = IotDevicePubSub.addIotDevice(app1);
+
+ TStream<String> data = app1.strings("A", "B", "C");
+
+ // Without this the tuple can be published and discarded before the
+ // subscriber is hooked up.
+ data = PlumbingStreams.blockingOneShotDelay(data, 500, TimeUnit.MILLISECONDS);
+
+ TStream<JsonObject> events = data.map(
+ s -> {JsonObject j = new JsonObject(); j.addProperty("v", s); return j;});
+ app1Iot.events(events, "ps1", QoS.FIRE_AND_FORGET);
+
+ TStream<JsonObject> echoedCmds = app1Iot.commands("ps1");
+
+ TStream<String> ecs = echoedCmds.map(j -> j.getAsJsonObject(IotDevice.CMD_PAYLOAD).getAsJsonPrimitive("v").getAsString());
+ Condition<List<String>> tcEcho = app1.getTester().streamContents(ecs, "A", "B", "C"); // Expect all tuples
+
+ Job jIot = dp.submit(iot.topology()).get();
+ Job jApp = dp.submit(app1.topology()).get();
+
+ for (int i = 0; i < 50 && !tcEcho.valid(); i++) {
+ Thread.sleep(50);
+ }
+
+ assertTrue(tcEcho.getResult().toString(), tcEcho.valid());
+
+ jIot.stateChange(Action.CLOSE);
+ jApp.stateChange(Action.CLOSE);
+ }
+}