You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/10 00:59:09 UTC

[1/5] incubator-quarks git commit: QUARKS-4 [WIP] Add building apps into build system

Repository: incubator-quarks
Updated Branches:
  refs/heads/master 2ca0f8802 -> aaac8cb77


QUARKS-4 [WIP] Add building apps into build system


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/8fe7bf15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/8fe7bf15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/8fe7bf15

Branch: refs/heads/master
Commit: 8fe7bf15fb016fb317d83b4ab4541a8e51effcd2
Parents: 7f5cf08
Author: Dan Debrunner <dj...@apache.org>
Authored: Mon Mar 7 20:10:17 2016 -0800
Committer: Daniel J. Debrunner <de...@us.ibm.com>
Committed: Wed Mar 9 15:36:01 2016 -0800

----------------------------------------------------------------------
 apps/iot/build.xml | 3 ++-
 build.xml          | 1 +
 2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/8fe7bf15/apps/iot/build.xml
----------------------------------------------------------------------
diff --git a/apps/iot/build.xml b/apps/iot/build.xml
index 193fdb0..0ce57d3 100644
--- a/apps/iot/build.xml
+++ b/apps/iot/build.xml
@@ -5,10 +5,11 @@
         Applications utilizing IotDevice
     </description>
 
+  <property name="component.path" value="apps/iot"/>
   <import file="../../common-build.xml"/>
 
   <path id="compile.classpath">
-    <pathelement location="${lib}/quarks.api.topology.jar" />
+    <pathelement location="${quarks.lib}/quarks.api.topology.jar" />
     <pathelement location="${quarks.connectors}/pubsub/lib/quarks.connectors.pubsub.jar" />
     <pathelement location="${quarks.connectors}/iot/lib/quarks.connectors.iot.jar" />
   </path>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/8fe7bf15/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 6c34d99..4ebb7d2 100644
--- a/build.xml
+++ b/build.xml
@@ -113,6 +113,7 @@
         <ant dir="connectors/wsclient-javax.websocket" target="@{target}" useNativeBasedir="true"/>
         <ant dir="console/server" target="@{target}" useNativeBasedir="true" />
         <ant dir="console/servlets" target="@{target}" useNativeBasedir="true" />
+        <ant dir="apps/iot" target="@{target}" useNativeBasedir="true" />
         <ant dir="providers/development" target="@{target}" useNativeBasedir="true"/>
         <ant dir="analytics/math3" target="@{target}" useNativeBasedir="true"/>
         <ant dir="analytics/sensors" target="@{target}" useNativeBasedir="true"/>


[4/5] incubator-quarks git commit: Add tests for IotDevicePubSub application and working setup

Posted by dj...@apache.org.
Add tests for IotDevicePubSub application and working setup


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/afa455ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/afa455ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/afa455ab

Branch: refs/heads/master
Commit: afa455aba142d5e9f08cb53824e8dbba318f0fd3
Parents: b412625
Author: Dan Debrunner <dj...@apache.org>
Authored: Tue Mar 8 10:12:24 2016 -0800
Committer: Daniel J. Debrunner <de...@us.ibm.com>
Committed: Wed Mar 9 15:36:02 2016 -0800

----------------------------------------------------------------------
 apps/.classpath                                 |   1 +
 apps/.gitignore                                 |   5 +
 apps/iot/build.xml                              |   2 +-
 .../java/quarks/apps/iot/IotDevicePubSub.java   | 158 ++++++++-----------
 .../java/quarks/apps/iot/ProxyIotDevice.java    |  75 +++++----
 .../quarks/test/apps/iot/EchoIotDevice.java     | 128 +++++++++++++++
 .../test/apps/iot/IotDevicePubSubTest.java      |  88 +++++++++++
 7 files changed, 328 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/.classpath
----------------------------------------------------------------------
diff --git a/apps/.classpath b/apps/.classpath
index a44dd95..5e6e513 100644
--- a/apps/.classpath
+++ b/apps/.classpath
@@ -1,6 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry kind="src" path="iot/src/main/java"/>
+	<classpathentry kind="src" path="iot/src/test/java"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry combineaccessrules="false" kind="src" path="/ext"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/.gitignore
----------------------------------------------------------------------
diff --git a/apps/.gitignore b/apps/.gitignore
new file mode 100644
index 0000000..948fe44
--- /dev/null
+++ b/apps/.gitignore
@@ -0,0 +1,5 @@
+/classes/
+**/test.classes/
+**/classes/
+**/unittests/
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/iot/build.xml
----------------------------------------------------------------------
diff --git a/apps/iot/build.xml b/apps/iot/build.xml
index 0ce57d3..838531b 100644
--- a/apps/iot/build.xml
+++ b/apps/iot/build.xml
@@ -16,7 +16,7 @@
 
   <path id="test.compile.classpath">
     <pathelement location="${jar}" />
-    <pathelement location="../../api/topology/test.classes" />
+    <pathelement location="${quarks.lib}/quarks.providers.direct.jar" />
     <path refid="compile.classpath"/>
   </path>
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java b/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
index 481e74e..d0ddb47 100644
--- a/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
+++ b/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
@@ -30,82 +30,67 @@ import quarks.topology.TStream;
 import quarks.topology.TopologyElement;
 
 /**
- * Application sharing an {@code IotDevice}
- * through publish-subscribe.
- * <BR>
- * This application allows sharing an {@link IotDevice}
- * across multiple running jobs. This allows a single
- * connection to a back-end message hub to be shared across
- * multiple independent applications, without having to
- * build a single topology.
+ * Application sharing an {@code IotDevice} through publish-subscribe. <BR>
+ * This application allows sharing an {@link IotDevice} across multiple running
+ * jobs. This allows a single connection to a back-end message hub to be shared
+ * across multiple independent applications, without having to build a single
+ * topology.
  * <P>
- * Applications coded to {@link IotDevice}
- * obtain a topology specific {@code IotDevice} using
- * {@link #addIotDevice(TopologyElement)}. This returned
- * device will route events and commands to/from the
- * actual message hub {@code IotDevice} through
- * publish-subscribe.
+ * Applications coded to {@link IotDevice} obtain a topology specific
+ * {@code IotDevice} using {@link #addIotDevice(TopologyElement)}. This returned
+ * device will route events and commands to/from the actual message hub
+ * {@code IotDevice} through publish-subscribe.
  * <P>
- * An instance of this application is created by creating
- * a new topology and then creating a {@link IotDevice}
- * specific to the desired message hub. Then the application
- * is created by calling {@link #IotDevicePubSub(IotDevice)}
- * passing the {@code IotDevice}.
- * <BR>
- * Then additional independent applications (topologies) can be created
- * and they create a proxy {@code IotDevice} for their topology using
- * {@link #addIotDevice(TopologyElement)}. This proxy  {@code IotDevice}
- * is then used to send device events and receive device commands in
- * that topology.
- * <BR>
+ * An instance of this application is created by creating a new topology and
+ * then creating a {@link IotDevice} specific to the desired message hub. Then
+ * the application is created by calling {@link #IotDevicePubSub(IotDevice)}
+ * passing the {@code IotDevice}. <BR>
+ * Then additional independent applications (topologies) can be created and they
+ * create a proxy {@code IotDevice} for their topology using
+ * {@link #addIotDevice(TopologyElement)}. This proxy {@code IotDevice} is then
+ * used to send device events and receive device commands in that topology. <BR>
  * Once all the topologies have been declared they can be submitted.
  * </P>
  * <P>
  * Limitations:
  * <UL>
- * <LI>
- * Subscribing to all device commands (passing no arguments to {@link IotDevice#commands(String...)} is
- * not supported by the proxy {@code IotDevice}. 
- * </LI>
- * <LI>
- * All applications that subscribe to device commands must be declared before
- * the instance of this application is submitted.
- * </LI>
+ * <LI>Subscribing to all device commands (passing no arguments to
+ * {@link IotDevice#commands(String...)} is not supported by the proxy
+ * {@code IotDevice}.</LI>
+ * <LI>All applications that subscribe to device commands must be declared
+ * before the instance of this application is submitted.</LI>
  * </UL>
  * </P>
  */
 public class IotDevicePubSub {
-	
-	/**
-	 * Events published to topic {@value} are sent as device
-	 * events using the actual message hub {@link IotDevice}.
-	 * <BR>
-	 * it is recommended applications use the {@code IotDevice}
-	 * returned by {@link #addIotDevice(TopologyElement)} to
-	 * send events rather than publishing streams to this
-	 * topic.
-	 */
+
+    /**
+     * Events published to topic {@value} are sent as device events using the
+     * actual message hub {@link IotDevice}. <BR>
+     * it is recommended applications use the {@code IotDevice} returned by
+     * {@link #addIotDevice(TopologyElement)} to send events rather than
+     * publishing streams to this topic.
+     */
     public static final String EVENTS = "quarks/iot/events";
-    
-	/**
-	 * Device commands are published to {@code quarks/iot/command/commandId}
-	 * by this application.
-	 * <BR>
-	 * it is recommended applications use the {@code IotDevice}
-	 * returned by {@link #addIotDevice(TopologyElement)} to
-	 * send events rather than subscribing to streams with this
-	 * topic prefix.
-	 */
+
+    /**
+     * Device commands are published to {@code quarks/iot/command/commandId} by
+     * this application. <BR>
+     * it is recommended applications use the {@code IotDevice} returned by
+     * {@link #addIotDevice(TopologyElement)} to send events rather than
+     * subscribing to streams with this topic prefix.
+     */
     public static final String COMMANDS_PREFIX = "quarks/iot/command/";
 
-	
     private final IotDevice device;
     private final Set<String> publishedCommandTopics = new HashSet<>();
-    
+
     /**
-     * Create an instance of this application using {@code device}
-     * as the device connection to a message hub.
-     * @param device Device to a message hub.
+     * Create an instance of this application using {@code device} as the device
+     * connection to a message hub.
+     * 
+     * @param device
+     *            Device to a message hub.
      */
     public IotDevicePubSub(IotDevice device) {
         this.device = device;
@@ -113,60 +98,53 @@ public class IotDevicePubSub {
     }
 
     /**
-     * Add a proxy {@code IotDevice} for the topology
-     * containing {@code te}.
+     * Add a proxy {@code IotDevice} for the topology containing {@code te}.
      * <P>
-     * Any events sent through the returned device
-     * are sent onto the message hub device through
-     * publish-subscribe.
-     * <BR>
-     * Subscribing to commands using the returned
-     * device will subscribe to commands received
-     * by the message hub device. 
+     * Any events sent through the returned device are sent onto the message hub
+     * device through publish-subscribe. <BR>
+     * Subscribing to commands using the returned device will subscribe to
+     * commands received by the message hub device.
      * </P>
      * 
-     * @param te Topology the returned device is contained in.
+     * @param te
+     *            Topology the returned device is contained in.
      * @return Proxy device.
      */
-    public IotDevice addIotDevice(TopologyElement te) {       
+    public IotDevice addIotDevice(TopologyElement te) {
         return new ProxyIotDevice(this, te.topology());
     }
-    
+
     /**
-     * Create initial application subscribing to
-     * events and delivering them to the message
-     * hub IotDevice.
+     * Create initial application subscribing to events and delivering them to
+     * the message hub IotDevice.
      * 
      */
     private void createApplication() {
-        
-        TStream<JsonObject> events = PublishSubscribe.subscribe(device,
-                EVENTS, JsonObject.class);
-        
-        device.events(events,
-                ew -> ew.getAsJsonPrimitive("eventId").getAsString(),
-                ew -> ew.getAsJsonObject("event"),
+
+        TStream<JsonObject> events = PublishSubscribe.subscribe(device, EVENTS, JsonObject.class);
+
+        device.events(events, ew -> ew.getAsJsonPrimitive("eventId").getAsString(), ew -> ew.getAsJsonObject("event"),
                 ew -> ew.getAsJsonPrimitive("qos").getAsInt());
     }
-    
+
     /**
-     * Subscribe to device commands.
-     * Keep track of which command identifiers are subscribed to
-     * so the stream for a command identifier is only created once.
+     * Subscribe to device commands. Keep track of which command identifiers are
+     * subscribed to so the stream for a command identifier is only created
+     * once.
      * 
      * @param commandIdentifier
-     * @return Topic that needs to be subscribed to in the
-     * subscriber application.
+     * @return Topic that needs to be subscribed to in the subscriber
+     *         application.
      */
     synchronized String subscribeToCommand(String commandIdentifier) {
-        
+
         String topic = COMMANDS_PREFIX + commandIdentifier;
-        
+
         if (!publishedCommandTopics.contains(topic)) {
             TStream<JsonObject> systemStream = device.commands(commandIdentifier);
             PublishSubscribe.publish(systemStream, topic, JsonObject.class);
         }
-        
+
         return topic;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java b/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
index 0c3b185..bfbf229 100644
--- a/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
+++ b/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
@@ -32,20 +32,22 @@ import quarks.topology.TStream;
 import quarks.topology.Topology;
 
 /**
- * Proxy IotDevice that uses publish-subscribe
- * and IotDevicePubSub application to communicate
- * with a single IotDevice connected to a message hub.
+ * Proxy IotDevice that uses publish-subscribe and IotDevicePubSub application
+ * to communicate with a single IotDevice connected to a message hub.
  */
 class ProxyIotDevice implements IotDevice {
-    
-	private IotDevicePubSub app;
-  
+
+    private IotDevicePubSub app;
+
     private final Topology topology;
-    
+
     /**
      * Create a proxy IotDevice
-     * @param app IotDevicePubSub application hosting the actual IotDevice.
-     * @param topology Topology of the subscribing application.
+     * 
+     * @param app
+     *            IotDevicePubSub application hosting the actual IotDevice.
+     * @param topology
+     *            Topology of the subscribing application.
      */
     ProxyIotDevice(IotDevicePubSub app, Topology topology) {
         this.app = app;
@@ -58,60 +60,57 @@ class ProxyIotDevice implements IotDevice {
     }
 
     /**
-     * Publishes events derived from {@code stream} using the
-     * topic {@link IotDevicePubSub#EVENTS} as a JsonObject
-     * containing eventId, event, and qos keys.
+     * Publishes events derived from {@code stream} using the topic
+     * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
+     * and qos keys.
      */
     @Override
     public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
             UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
-        
-        stream = stream.map(event ->
-         {
-             JsonObject publishedEvent = new JsonObject();
-             
-             publishedEvent.addProperty("eventId", eventId.apply(event));
-             publishedEvent.add("event", payload.apply(event));
-             publishedEvent.addProperty("qos", qos.apply(event));
-                         
-             return publishedEvent;
-         });
-        
+
+        stream = stream.map(event -> {
+            JsonObject publishedEvent = new JsonObject();
+
+            publishedEvent.addProperty("eventId", eventId.apply(event));
+            publishedEvent.add("event", payload.apply(event));
+            publishedEvent.addProperty("qos", qos.apply(event));
+
+            return publishedEvent;
+        });
+
         return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS, JsonObject.class);
     }
 
     /**
-     * Publishes events derived from {@code stream} using the
-     * topic {@link IotDevicePubSub#EVENTS} as a JsonObject
-     * containing eventId, event, and qos keys.
+     * Publishes events derived from {@code stream} using the topic
+     * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
+     * and qos keys.
      */
     @Override
     public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
 
-        stream = stream.map(event ->
-        {
+        stream = stream.map(event -> {
             JsonObject publishedEvent = new JsonObject();
-            
+
             publishedEvent.addProperty("eventId", eventId);
             publishedEvent.add("event", event);
             publishedEvent.addProperty("qos", qos);
-                        
+
             return publishedEvent;
         });
-        
+
         return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS, JsonObject.class);
     }
-    
+
     /**
-     * Subscribes to commands.
-     * Doesn't yet support subscribing to all commands.
+     * Subscribes to commands. Doesn't yet support subscribing to all commands.
      */
     @Override
     public TStream<JsonObject> commands(String... commandIdentifiers) {
-        
+
         final String firstTopic = app.subscribeToCommand(commandIdentifiers[0]);
         TStream<JsonObject> commandsStream = PublishSubscribe.subscribe(this, firstTopic, JsonObject.class);
-        
+
         if (commandIdentifiers.length > 1) {
             Set<TStream<JsonObject>> additionalStreams = new HashSet<>();
             for (int i = 1; i < commandIdentifiers.length; i++) {
@@ -120,7 +119,7 @@ class ProxyIotDevice implements IotDevice {
             }
             commandsStream = commandsStream.union(additionalStreams);
         }
-     
+
         return commandsStream;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/iot/src/test/java/quarks/test/apps/iot/EchoIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/quarks/test/apps/iot/EchoIotDevice.java b/apps/iot/src/test/java/quarks/test/apps/iot/EchoIotDevice.java
new file mode 100644
index 0000000..fc91668
--- /dev/null
+++ b/apps/iot/src/test/java/quarks/test/apps/iot/EchoIotDevice.java
@@ -0,0 +1,128 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.test.apps.iot;
+
+import static quarks.function.Functions.alwaysTrue;
+import static quarks.function.Functions.discard;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.gson.JsonObject;
+
+import quarks.connectors.iot.IotDevice;
+import quarks.function.Function;
+import quarks.function.UnaryOperator;
+import quarks.topology.TSink;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.plumbing.PlumbingStreams;
+
+/**
+ * A test IotDevice that echos back every event as a command with command
+ * identifier equal to the {@code cmdId} value in the event payload. If {@code cmdId}
+ * is not set then {@code ec_eventId} is used.
+ *
+ */
+public class EchoIotDevice implements IotDevice {
+    
+    public static final String EVENT_CMD_ID = "cmdId";
+
+    private final Topology topology;
+    private TStream<JsonObject> echoCmds;
+
+    public EchoIotDevice(Topology topology) {
+        this.topology = topology;
+    }
+
+    @Override
+    public Topology topology() {
+        return topology;
+    }
+
+    @Override
+    public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+            UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
+        
+        stream = stream.map(e -> {
+            JsonObject c = new JsonObject();
+            JsonObject evPayload = payload.apply(e);
+            c.addProperty(CMD_ID, getCommandIdFromEvent(eventId.apply(e), evPayload));
+            c.add(CMD_PAYLOAD, evPayload);
+            c.addProperty(CMD_FORMAT, "json");
+            c.addProperty(CMD_TS, System.currentTimeMillis());
+            return c;
+        });
+        
+        return handleEvents(stream);
+    }
+    
+    private static String getCommandIdFromEvent(String eventId, JsonObject evPayload) {
+        if (evPayload.has(EVENT_CMD_ID))
+            return evPayload.getAsJsonPrimitive(EVENT_CMD_ID).getAsString();
+        else
+            return "ec_" + eventId;
+    }
+
+    @Override
+    public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
+        
+        stream = stream.map(e -> {
+            JsonObject c = new JsonObject();
+            c.addProperty(CMD_ID, getCommandIdFromEvent(eventId, e));
+            c.add(CMD_PAYLOAD, e);
+            c.addProperty(CMD_FORMAT, "json");
+            c.addProperty(CMD_TS, System.currentTimeMillis());
+            return c;
+        });
+        
+        return handleEvents(stream);
+    }
+    
+    private TSink<JsonObject> handleEvents(TStream<JsonObject> stream) {
+        
+        if (echoCmds == null)
+            echoCmds = PlumbingStreams.isolate(stream, true);
+        else
+            echoCmds = PlumbingStreams.isolate(stream.union(echoCmds), true);
+        
+        return stream.sink(discard());
+    }
+
+    @Override
+    public TStream<JsonObject> commands(String... commands) {
+        if (commands.length == 0)
+            return echoCmds.filter(alwaysTrue());
+
+        TStream<JsonObject> cmd0 = echoCmds
+                .filter(j -> j.getAsJsonPrimitive(CMD_ID).getAsString().equals(commands[0]));
+
+        if (commands.length == 1)
+            return cmd0;
+
+        Set<TStream<JsonObject>> cmds = new HashSet<>();
+        for (int i = 1; i < commands.length; i++) {
+            final int idx = i;
+            cmds.add(echoCmds.filter(j -> j.getAsJsonPrimitive(CMD_ID).getAsString().equals(commands[idx])));
+        }
+
+        return cmd0.union(cmds);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/iot/src/test/java/quarks/test/apps/iot/IotDevicePubSubTest.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/quarks/test/apps/iot/IotDevicePubSubTest.java b/apps/iot/src/test/java/quarks/test/apps/iot/IotDevicePubSubTest.java
new file mode 100644
index 0000000..38555c4
--- /dev/null
+++ b/apps/iot/src/test/java/quarks/test/apps/iot/IotDevicePubSubTest.java
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.test.apps.iot;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+import quarks.apps.iot.IotDevicePubSub;
+import quarks.connectors.iot.IotDevice;
+import quarks.connectors.iot.QoS;
+import quarks.connectors.pubsub.service.ProviderPubSub;
+import quarks.connectors.pubsub.service.PublishSubscribeService;
+import quarks.execution.Job;
+import quarks.execution.Job.Action;
+import quarks.providers.direct.DirectProvider;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.plumbing.PlumbingStreams;
+import quarks.topology.tester.Condition;
+
+public class IotDevicePubSubTest {
+
+    
+
+    @Test
+    public void testIotDevicePubSubApp() throws Exception {
+        DirectProvider dp = new DirectProvider();
+
+        dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
+        
+        Topology iot = dp.newTopology("IotPubSub");
+        IotDevicePubSub iotApp = new IotDevicePubSub(new EchoIotDevice(iot));
+        
+        Topology app1 = dp.newTopology("App1");
+        
+        IotDevice app1Iot = iotApp.addIotDevice(app1);
+        
+        TStream<String> data = app1.strings("A", "B", "C");
+        
+        // Without this the tuple can be published and discarded before the
+        // subscriber is hooked up.
+        data = PlumbingStreams.blockingOneShotDelay(data, 500, TimeUnit.MILLISECONDS);
+        
+        TStream<JsonObject> events = data.map(
+                s -> {JsonObject j = new JsonObject(); j.addProperty("v", s); return j;});       
+        app1Iot.events(events, "ps1", QoS.FIRE_AND_FORGET);
+        
+        TStream<JsonObject> echoedCmds = app1Iot.commands("ec_ps1");
+        
+        TStream<String> ecs = echoedCmds.map(j -> j.getAsJsonObject(IotDevice.CMD_PAYLOAD).getAsJsonPrimitive("v").getAsString());
+        Condition<List<String>> tcEcho = app1.getTester().streamContents(ecs, "A", "B", "C"); // Expect all tuples
+        
+        Job jIot = dp.submit(iot.topology()).get();
+        Job jApp = dp.submit(app1.topology()).get();
+
+        for (int i = 0; i < 50 && !tcEcho.valid(); i++) {
+            Thread.sleep(50);
+        }
+
+        assertTrue(tcEcho.getResult().toString(), tcEcho.valid());        
+
+        jIot.stateChange(Action.CLOSE);
+        jApp.stateChange(Action.CLOSE);
+    }
+}


[5/5] incubator-quarks git commit: Fix apps classpath for Eclipse project

Posted by dj...@apache.org.
Fix apps classpath for Eclipse project


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/aaac8cb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/aaac8cb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/aaac8cb7

Branch: refs/heads/master
Commit: aaac8cb7748dc7caf569d3973540437dc810e0ad
Parents: afa455a
Author: Dan Debrunner <dj...@apache.org>
Authored: Tue Mar 8 11:28:04 2016 -0800
Committer: Daniel J. Debrunner <de...@us.ibm.com>
Committed: Wed Mar 9 15:36:02 2016 -0800

----------------------------------------------------------------------
 apps/.classpath | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/aaac8cb7/apps/.classpath
----------------------------------------------------------------------
diff --git a/apps/.classpath b/apps/.classpath
index 5e6e513..630e21b 100644
--- a/apps/.classpath
+++ b/apps/.classpath
@@ -7,5 +7,8 @@
 	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
 	<classpathentry combineaccessrules="false" kind="src" path="/api"/>
 	<classpathentry combineaccessrules="false" kind="src" path="/connectors"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/providers"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/spi"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/runtime"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>


[3/5] incubator-quarks git commit: Cleanup for IotDevice

Posted by dj...@apache.org.
Cleanup for IotDevice


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/b4126254
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/b4126254
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/b4126254

Branch: refs/heads/master
Commit: b412625497eb81b4ec2b40eac4c48f07707ff498
Parents: 8fe7bf1
Author: Dan Debrunner <dj...@apache.org>
Authored: Tue Mar 8 10:07:21 2016 -0800
Committer: Daniel J. Debrunner <de...@us.ibm.com>
Committed: Wed Mar 9 15:36:02 2016 -0800

----------------------------------------------------------------------
 .../java/quarks/connectors/iot/IotDevice.java   | 52 +++++++++++++++++---
 .../quarks/connectors/iot/package-info.java     |  8 ++-
 2 files changed, 52 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/b4126254/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java b/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
index c98673a..70d0664 100644
--- a/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
+++ b/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
@@ -17,6 +17,11 @@ import quarks.topology.TopologyElement;
  * Generic Internet of Things device connector.
  */
 public interface IotDevice extends TopologyElement {
+    
+    /**
+     * Device event and command identifiers starting with {@value} are reserved for use by Quarks.
+     */
+    String RESERVED_ID_PREFIX = "quarks";
 
     /**
      * Publish a stream's tuples as device events.
@@ -35,7 +40,7 @@ public interface IotDevice extends TopologyElement {
      *            function to supply the event's delivery Quality of Service.
      * @return TSink sink element representing termination of this stream.
      */
-    public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+    TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
             UnaryOperator<JsonObject> payload,
             Function<JsonObject, Integer> qos) ;
     
@@ -53,17 +58,50 @@ public interface IotDevice extends TopologyElement {
      *            Event's delivery Quality of Service.
      * @return TSink sink element representing termination of this stream.
      */
-    public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) ;
+    TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) ;
+    
+    /**
+     * Command identifier key.
+     * Key is {@value}.
+     * 
+     * @see #commands(String...)
+     */
+    String CMD_ID = "command";
+    
+    /**
+     * Command timestamp (in milliseconds) key.
+     * Key is {@value}.
+     * 
+     * @see #commands(String...)
+     */
+    String CMD_TS = "tsms";
+    /**
+     * Command format key.
+     * Key is {@value}.
+     * 
+     * @see #commands(String...)
+     */
+    String CMD_FORMAT = "format";
+    /**
+     * Command payload key.
+     * If the command format is {@code json} then
+     * the key's value will be a {@code JsonObject},
+     * otherwise a {@code String}.
+     * Key is {@value}.
+     * 
+     * @see #commands(String...)
+     */
+    String CMD_PAYLOAD = "payload";
 
     /**
      * Create a stream of device commands as JSON objects.
      * Each command sent to the device matching {@code commands} will result in a tuple
      * on the stream. The JSON object has these keys:
      * <UL>
-     * <LI>{@code command} - Command identifier as a String</LI>
-     * <LI>{@code tsms} - Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI>
-     * <LI>{@code format} - Format of the command as a String</LI>
-     * <LI>{@code payload} - Payload of the command</LI>
+     * <LI>{@link #CMD_ID command} - Command identifier as a String</LI>
+     * <LI>{@link #CMD_TS tsms} - Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI>
+     * <LI>{@link #CMD_FORMAT format} - Format of the command as a String</LI>
+     * <LI>{@link #CMD_PAYLOAD payload} - Payload of the command</LI>
      * <UL>
      * <LI>If {@code format} is {@code json} then {@code payload} is JSON</LI>
      * <LI>Otherwise {@code payload} is String
@@ -75,5 +113,5 @@ public interface IotDevice extends TopologyElement {
      * stream will contain all device commands.
      * @return Stream containing device commands.
      */
-    public TStream<JsonObject> commands(String... commands);
+    TStream<JsonObject> commands(String... commands);
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/b4126254/connectors/iot/src/main/java/quarks/connectors/iot/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/quarks/connectors/iot/package-info.java b/connectors/iot/src/main/java/quarks/connectors/iot/package-info.java
index 8845516..1416f9c 100644
--- a/connectors/iot/src/main/java/quarks/connectors/iot/package-info.java
+++ b/connectors/iot/src/main/java/quarks/connectors/iot/package-info.java
@@ -23,6 +23,7 @@
  * The format for the payload is JSON, support for other payload formats may be added
  * in the future.
  * </LI>
+ * <P>
  * <LI>
  * <B>Device Commands</B> - A device {@link quarks.connectors.iot.IotDevice#commands(String...) subscribes} to <em>commands</em> from back-end systems
  * through the message hub. A device command consists of:
@@ -34,8 +35,13 @@
  * Device commands can be used to perform any action on the device including displaying information,
  * controlling the device (e.g. reduce maximum engine revolutions), controlling the Quarks application, etc.
  * </LI>
- * </UL>
  * The format for the payload is typically JSON, though other formats may be used.
+ * </UL>
+ * </P>
+ * <P>
+ * Device event and command identifiers starting with "{@link quarks.connectors.iot.IotDevice#RESERVED_ID_PREFIX quarks}"
+ * are reserved for use by Quarks.
+ * </P>
  */
 package quarks.connectors.iot;
 


[2/5] incubator-quarks git commit: QUARKS-4 [WIP] Initial version of iot device publish subscribe application

Posted by dj...@apache.org.
QUARKS-4 [WIP] Initial version of iot device publish subscribe application


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/7f5cf083
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/7f5cf083
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/7f5cf083

Branch: refs/heads/master
Commit: 7f5cf083fd4ecc9e8ac5922ca4392c143d0d55be
Parents: 2ca0f88
Author: Dan Debrunner <dj...@apache.org>
Authored: Mon Mar 7 18:34:07 2016 -0800
Committer: Daniel J. Debrunner <de...@us.ibm.com>
Committed: Wed Mar 9 15:36:01 2016 -0800

----------------------------------------------------------------------
 apps/.classpath                                 |  10 ++
 apps/.project                                   |  17 ++
 apps/iot/build.xml                              |  28 +++
 .../java/quarks/apps/iot/IotDevicePubSub.java   | 172 +++++++++++++++++++
 .../java/quarks/apps/iot/ProxyIotDevice.java    | 127 ++++++++++++++
 .../main/java/quarks/apps/iot/package-info.java |  23 +++
 .../java/quarks/connectors/iot/IotDevice.java   |   4 +-
 7 files changed, 379 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/7f5cf083/apps/.classpath
----------------------------------------------------------------------
diff --git a/apps/.classpath b/apps/.classpath
new file mode 100644
index 0000000..a44dd95
--- /dev/null
+++ b/apps/.classpath
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" path="iot/src/main/java"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/ext"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/api"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/connectors"/>
+	<classpathentry kind="output" path="bin"/>
+</classpath>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/7f5cf083/apps/.project
----------------------------------------------------------------------
diff --git a/apps/.project b/apps/.project
new file mode 100644
index 0000000..142fc54
--- /dev/null
+++ b/apps/.project
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>apps</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+	</natures>
+</projectDescription>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/7f5cf083/apps/iot/build.xml
----------------------------------------------------------------------
diff --git a/apps/iot/build.xml b/apps/iot/build.xml
new file mode 100644
index 0000000..193fdb0
--- /dev/null
+++ b/apps/iot/build.xml
@@ -0,0 +1,28 @@
+<project name="quarks.apps.iot" default="all" 
+    xmlns:jacoco="antlib:org.jacoco.ant"
+    >
+    <description>
+        Applications utilizing IotDevice
+    </description>
+
+  <import file="../../common-build.xml"/>
+
+  <path id="compile.classpath">
+    <pathelement location="${lib}/quarks.api.topology.jar" />
+    <pathelement location="${quarks.connectors}/pubsub/lib/quarks.connectors.pubsub.jar" />
+    <pathelement location="${quarks.connectors}/iot/lib/quarks.connectors.iot.jar" />
+  </path>
+
+  <path id="test.compile.classpath">
+    <pathelement location="${jar}" />
+    <pathelement location="../../api/topology/test.classes" />
+    <path refid="compile.classpath"/>
+  </path>
+
+  <path id="test.classpath">
+    <pathelement location="${test.classes}" />
+    <path refid="test.compile.classpath"/>
+    <path refid="test.common.classpath" />
+  </path>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/7f5cf083/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java b/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
new file mode 100644
index 0000000..481e74e
--- /dev/null
+++ b/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
@@ -0,0 +1,172 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.apps.iot;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.gson.JsonObject;
+
+import quarks.connectors.iot.IotDevice;
+import quarks.connectors.pubsub.PublishSubscribe;
+import quarks.topology.TStream;
+import quarks.topology.TopologyElement;
+
+/**
+ * Application sharing an {@code IotDevice}
+ * through publish-subscribe.
+ * <BR>
+ * This application allows sharing an {@link IotDevice}
+ * across multiple running jobs. This allows a single
+ * connection to a back-end message hub to be shared across
+ * multiple independent applications, without having to
+ * build a single topology.
+ * <P>
+ * Applications coded to {@link IotDevice}
+ * obtain a topology specific {@code IotDevice} using
+ * {@link #addIotDevice(TopologyElement)}. This returned
+ * device will route events and commands to/from the
+ * actual message hub {@code IotDevice} through
+ * publish-subscribe.
+ * <P>
+ * An instance of this application is created by creating
+ * a new topology and then creating a {@link IotDevice}
+ * specific to the desired message hub. Then the application
+ * is created by calling {@link #IotDevicePubSub(IotDevice)}
+ * passing the {@code IotDevice}.
+ * <BR>
+ * Then additional independent applications (topologies) can be created
+ * and they create a proxy {@code IotDevice} for their topology using
+ * {@link #addIotDevice(TopologyElement)}. This proxy  {@code IotDevice}
+ * is then used to send device events and receive device commands in
+ * that topology.
+ * <BR>
+ * Once all the topologies have been declared they can be submitted.
+ * </P>
+ * <P>
+ * Limitations:
+ * <UL>
+ * <LI>
+ * Subscribing to all device commands (passing no arguments to {@link IotDevice#commands(String...)} is
+ * not supported by the proxy {@code IotDevice}. 
+ * </LI>
+ * <LI>
+ * All applications that subscribe to device commands must be declared before
+ * the instance of this application is submitted.
+ * </LI>
+ * </UL>
+ * </P>
+ */
+public class IotDevicePubSub {
+	
+	/**
+	 * Events published to topic {@value} are sent as device
+	 * events using the actual message hub {@link IotDevice}.
+	 * <BR>
+	 * it is recommended applications use the {@code IotDevice}
+	 * returned by {@link #addIotDevice(TopologyElement)} to
+	 * send events rather than publishing streams to this
+	 * topic.
+	 */
+    public static final String EVENTS = "quarks/iot/events";
+    
+	/**
+	 * Device commands are published to {@code quarks/iot/command/commandId}
+	 * by this application.
+	 * <BR>
+	 * it is recommended applications use the {@code IotDevice}
+	 * returned by {@link #addIotDevice(TopologyElement)} to
+	 * send events rather than subscribing to streams with this
+	 * topic prefix.
+	 */
+    public static final String COMMANDS_PREFIX = "quarks/iot/command/";
+
+	
+    private final IotDevice device;
+    private final Set<String> publishedCommandTopics = new HashSet<>();
+    
+    /**
+     * Create an instance of this application using {@code device}
+     * as the device connection to a message hub.
+     * @param device Device to a message hub.
+     */
+    public IotDevicePubSub(IotDevice device) {
+        this.device = device;
+        createApplication();
+    }
+
+    /**
+     * Add a proxy {@code IotDevice} for the topology
+     * containing {@code te}.
+     * <P>
+     * Any events sent through the returned device
+     * are sent onto the message hub device through
+     * publish-subscribe.
+     * <BR>
+     * Subscribing to commands using the returned
+     * device will subscribe to commands received
+     * by the message hub device. 
+     * </P>
+     * 
+     * @param te Topology the returned device is contained in.
+     * @return Proxy device.
+     */
+    public IotDevice addIotDevice(TopologyElement te) {       
+        return new ProxyIotDevice(this, te.topology());
+    }
+    
+    /**
+     * Create initial application subscribing to
+     * events and delivering them to the message
+     * hub IotDevice.
+     * 
+     */
+    private void createApplication() {
+        
+        TStream<JsonObject> events = PublishSubscribe.subscribe(device,
+                EVENTS, JsonObject.class);
+        
+        device.events(events,
+                ew -> ew.getAsJsonPrimitive("eventId").getAsString(),
+                ew -> ew.getAsJsonObject("event"),
+                ew -> ew.getAsJsonPrimitive("qos").getAsInt());
+    }
+    
+    /**
+     * Subscribe to device commands.
+     * Keep track of which command identifiers are subscribed to
+     * so the stream for a command identifier is only created once.
+     * 
+     * @param commandIdentifier
+     * @return Topic that needs to be subscribed to in the
+     * subscriber application.
+     */
+    synchronized String subscribeToCommand(String commandIdentifier) {
+        
+        String topic = COMMANDS_PREFIX + commandIdentifier;
+        
+        if (!publishedCommandTopics.contains(topic)) {
+            TStream<JsonObject> systemStream = device.commands(commandIdentifier);
+            PublishSubscribe.publish(systemStream, topic, JsonObject.class);
+        }
+        
+        return topic;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/7f5cf083/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java b/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
new file mode 100644
index 0000000..0c3b185
--- /dev/null
+++ b/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
@@ -0,0 +1,127 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.apps.iot;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.gson.JsonObject;
+
+import quarks.connectors.iot.IotDevice;
+import quarks.connectors.pubsub.PublishSubscribe;
+import quarks.function.Function;
+import quarks.function.UnaryOperator;
+import quarks.topology.TSink;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * Proxy IotDevice that uses publish-subscribe
+ * and IotDevicePubSub application to communicate
+ * with a single IotDevice connected to a message hub.
+ */
+class ProxyIotDevice implements IotDevice {
+    
+	private IotDevicePubSub app;
+  
+    private final Topology topology;
+    
+    /**
+     * Create a proxy IotDevice
+     * @param app IotDevicePubSub application hosting the actual IotDevice.
+     * @param topology Topology of the subscribing application.
+     */
+    ProxyIotDevice(IotDevicePubSub app, Topology topology) {
+        this.app = app;
+        this.topology = topology;
+    }
+
+    @Override
+    public final Topology topology() {
+        return topology;
+    }
+
+    /**
+     * Publishes events derived from {@code stream} using the
+     * topic {@link IotDevicePubSub#EVENTS} as a JsonObject
+     * containing eventId, event, and qos keys.
+     */
+    @Override
+    public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+            UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
+        
+        stream = stream.map(event ->
+         {
+             JsonObject publishedEvent = new JsonObject();
+             
+             publishedEvent.addProperty("eventId", eventId.apply(event));
+             publishedEvent.add("event", payload.apply(event));
+             publishedEvent.addProperty("qos", qos.apply(event));
+                         
+             return publishedEvent;
+         });
+        
+        return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS, JsonObject.class);
+    }
+
+    /**
+     * Publishes events derived from {@code stream} using the
+     * topic {@link IotDevicePubSub#EVENTS} as a JsonObject
+     * containing eventId, event, and qos keys.
+     */
+    @Override
+    public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
+
+        stream = stream.map(event ->
+        {
+            JsonObject publishedEvent = new JsonObject();
+            
+            publishedEvent.addProperty("eventId", eventId);
+            publishedEvent.add("event", event);
+            publishedEvent.addProperty("qos", qos);
+                        
+            return publishedEvent;
+        });
+        
+        return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS, JsonObject.class);
+    }
+    
+    /**
+     * Subscribes to commands.
+     * Doesn't yet support subscribing to all commands.
+     */
+    @Override
+    public TStream<JsonObject> commands(String... commandIdentifiers) {
+        
+        final String firstTopic = app.subscribeToCommand(commandIdentifiers[0]);
+        TStream<JsonObject> commandsStream = PublishSubscribe.subscribe(this, firstTopic, JsonObject.class);
+        
+        if (commandIdentifiers.length > 1) {
+            Set<TStream<JsonObject>> additionalStreams = new HashSet<>();
+            for (int i = 1; i < commandIdentifiers.length; i++) {
+                String topic = app.subscribeToCommand(commandIdentifiers[i]);
+                additionalStreams.add(PublishSubscribe.subscribe(this, topic, JsonObject.class));
+            }
+            commandsStream = commandsStream.union(additionalStreams);
+        }
+     
+        return commandsStream;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/7f5cf083/apps/iot/src/main/java/quarks/apps/iot/package-info.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/quarks/apps/iot/package-info.java b/apps/iot/src/main/java/quarks/apps/iot/package-info.java
new file mode 100644
index 0000000..d017412
--- /dev/null
+++ b/apps/iot/src/main/java/quarks/apps/iot/package-info.java
@@ -0,0 +1,23 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Applications for use in an Internet of Things environment.
+ */
+package quarks.apps.iot;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/7f5cf083/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java b/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
index e117ebe..c98673a 100644
--- a/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
+++ b/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
@@ -61,7 +61,7 @@ public interface IotDevice extends TopologyElement {
      * on the stream. The JSON object has these keys:
      * <UL>
      * <LI>{@code command} - Command identifier as a String</LI>
-     * <LI>{@code tsms} - IoTF Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI>
+     * <LI>{@code tsms} - Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI>
      * <LI>{@code format} - Format of the command as a String</LI>
      * <LI>{@code payload} - Payload of the command</LI>
      * <UL>
@@ -71,7 +71,7 @@ public interface IotDevice extends TopologyElement {
      * </UL>
      * 
      * 
-     * @param commands Commands to include. If no commands are provided then the
+     * @param commands Command identifiers to include. If no command identifiers are provided then the
      * stream will contain all device commands.
      * @return Stream containing device commands.
      */