You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/08 04:20:22 UTC
incubator-quarks git commit: QUARKS-4 [WIP] Initial version of iot
device publish subscribe application
Repository: incubator-quarks
Updated Branches:
refs/heads/quarks-4 [created] e2f81e9e4
QUARKS-4 [WIP] Initial version of iot device publish subscribe application
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/e2f81e9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/e2f81e9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/e2f81e9e
Branch: refs/heads/quarks-4
Commit: e2f81e9e4a255d2d09a45682fe12aa137474f40c
Parents: aec314d
Author: Dan Debrunner <dj...@apache.org>
Authored: Mon Mar 7 18:34:07 2016 -0800
Committer: Dan Debrunner <dj...@apache.org>
Committed: Mon Mar 7 18:34:07 2016 -0800
----------------------------------------------------------------------
apps/.classpath | 10 ++
apps/.project | 17 ++
apps/iot/build.xml | 28 +++
.../java/quarks/apps/iot/IotDevicePubSub.java | 172 +++++++++++++++++++
.../java/quarks/apps/iot/ProxyIotDevice.java | 127 ++++++++++++++
.../main/java/quarks/apps/iot/package-info.java | 23 +++
.../java/quarks/connectors/iot/IotDevice.java | 4 +-
7 files changed, 379 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e2f81e9e/apps/.classpath
----------------------------------------------------------------------
diff --git a/apps/.classpath b/apps/.classpath
new file mode 100644
index 0000000..a44dd95
--- /dev/null
+++ b/apps/.classpath
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="iot/src/main/java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/ext"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/api"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/connectors"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e2f81e9e/apps/.project
----------------------------------------------------------------------
diff --git a/apps/.project b/apps/.project
new file mode 100644
index 0000000..142fc54
--- /dev/null
+++ b/apps/.project
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>apps</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e2f81e9e/apps/iot/build.xml
----------------------------------------------------------------------
diff --git a/apps/iot/build.xml b/apps/iot/build.xml
new file mode 100644
index 0000000..193fdb0
--- /dev/null
+++ b/apps/iot/build.xml
@@ -0,0 +1,28 @@
+<project name="quarks.apps.iot" default="all"
+ xmlns:jacoco="antlib:org.jacoco.ant"
+ >
+ <description>
+ Applications utilizing IotDevice
+ </description>
+
+ <import file="../../common-build.xml"/>
+
+ <path id="compile.classpath">
+ <pathelement location="${lib}/quarks.api.topology.jar" />
+ <pathelement location="${quarks.connectors}/pubsub/lib/quarks.connectors.pubsub.jar" />
+ <pathelement location="${quarks.connectors}/iot/lib/quarks.connectors.iot.jar" />
+ </path>
+
+ <path id="test.compile.classpath">
+ <pathelement location="${jar}" />
+ <pathelement location="../../api/topology/test.classes" />
+ <path refid="compile.classpath"/>
+ </path>
+
+ <path id="test.classpath">
+ <pathelement location="${test.classes}" />
+ <path refid="test.compile.classpath"/>
+ <path refid="test.common.classpath" />
+ </path>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e2f81e9e/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java b/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
new file mode 100644
index 0000000..481e74e
--- /dev/null
+++ b/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
@@ -0,0 +1,172 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.apps.iot;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.gson.JsonObject;
+
+import quarks.connectors.iot.IotDevice;
+import quarks.connectors.pubsub.PublishSubscribe;
+import quarks.topology.TStream;
+import quarks.topology.TopologyElement;
+
+/**
+ * Application sharing an {@code IotDevice}
+ * through publish-subscribe.
+ * <BR>
+ * This application allows sharing an {@link IotDevice}
+ * across multiple running jobs. This allows a single
+ * connection to a back-end message hub to be shared across
+ * multiple independent applications, without having to
+ * build a single topology.
+ * <P>
+ * Applications coded to {@link IotDevice}
+ * obtain a topology specific {@code IotDevice} using
+ * {@link #addIotDevice(TopologyElement)}. This returned
+ * device will route events and commands to/from the
+ * actual message hub {@code IotDevice} through
+ * publish-subscribe.
+ * <P>
+ * An instance of this application is created by creating
+ * a new topology and then creating a {@link IotDevice}
+ * specific to the desired message hub. Then the application
+ * is created by calling {@link #IotDevicePubSub(IotDevice)}
+ * passing the {@code IotDevice}.
+ * <BR>
+ * Then additional independent applications (topologies) can be created
+ * and they create a proxy {@code IotDevice} for their topology using
+ * {@link #addIotDevice(TopologyElement)}. This proxy {@code IotDevice}
+ * is then used to send device events and receive device commands in
+ * that topology.
+ * <BR>
+ * Once all the topologies have been declared they can be submitted.
+ * </P>
+ * <P>
+ * Limitations:
+ * <UL>
+ * <LI>
+ * Subscribing to all device commands (passing no arguments to {@link IotDevice#commands(String...)} is
+ * not supported by the proxy {@code IotDevice}.
+ * </LI>
+ * <LI>
+ * All applications that subscribe to device commands must be declared before
+ * the instance of this application is submitted.
+ * </LI>
+ * </UL>
+ * </P>
+ */
+public class IotDevicePubSub {
+
+ /**
+ * Events published to topic {@value} are sent as device
+ * events using the actual message hub {@link IotDevice}.
+ * <BR>
+ * it is recommended applications use the {@code IotDevice}
+ * returned by {@link #addIotDevice(TopologyElement)} to
+ * send events rather than publishing streams to this
+ * topic.
+ */
+ public static final String EVENTS = "quarks/iot/events";
+
+ /**
+ * Device commands are published to {@code quarks/iot/command/commandId}
+ * by this application.
+ * <BR>
+ * it is recommended applications use the {@code IotDevice}
+ * returned by {@link #addIotDevice(TopologyElement)} to
+ * send events rather than subscribing to streams with this
+ * topic prefix.
+ */
+ public static final String COMMANDS_PREFIX = "quarks/iot/command/";
+
+
+ private final IotDevice device;
+ private final Set<String> publishedCommandTopics = new HashSet<>();
+
+ /**
+ * Create an instance of this application using {@code device}
+ * as the device connection to a message hub.
+ * @param device Device to a message hub.
+ */
+ public IotDevicePubSub(IotDevice device) {
+ this.device = device;
+ createApplication();
+ }
+
+ /**
+ * Add a proxy {@code IotDevice} for the topology
+ * containing {@code te}.
+ * <P>
+ * Any events sent through the returned device
+ * are sent onto the message hub device through
+ * publish-subscribe.
+ * <BR>
+ * Subscribing to commands using the returned
+ * device will subscribe to commands received
+ * by the message hub device.
+ * </P>
+ *
+ * @param te Topology the returned device is contained in.
+ * @return Proxy device.
+ */
+ public IotDevice addIotDevice(TopologyElement te) {
+ return new ProxyIotDevice(this, te.topology());
+ }
+
+ /**
+ * Create initial application subscribing to
+ * events and delivering them to the message
+ * hub IotDevice.
+ *
+ */
+ private void createApplication() {
+
+ TStream<JsonObject> events = PublishSubscribe.subscribe(device,
+ EVENTS, JsonObject.class);
+
+ device.events(events,
+ ew -> ew.getAsJsonPrimitive("eventId").getAsString(),
+ ew -> ew.getAsJsonObject("event"),
+ ew -> ew.getAsJsonPrimitive("qos").getAsInt());
+ }
+
+ /**
+ * Subscribe to device commands.
+ * Keep track of which command identifiers are subscribed to
+ * so the stream for a command identifier is only created once.
+ *
+ * @param commandIdentifier
+ * @return Topic that needs to be subscribed to in the
+ * subscriber application.
+ */
+ synchronized String subscribeToCommand(String commandIdentifier) {
+
+ String topic = COMMANDS_PREFIX + commandIdentifier;
+
+ if (!publishedCommandTopics.contains(topic)) {
+ TStream<JsonObject> systemStream = device.commands(commandIdentifier);
+ PublishSubscribe.publish(systemStream, topic, JsonObject.class);
+ }
+
+ return topic;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e2f81e9e/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java b/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
new file mode 100644
index 0000000..0c3b185
--- /dev/null
+++ b/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
@@ -0,0 +1,127 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.apps.iot;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.gson.JsonObject;
+
+import quarks.connectors.iot.IotDevice;
+import quarks.connectors.pubsub.PublishSubscribe;
+import quarks.function.Function;
+import quarks.function.UnaryOperator;
+import quarks.topology.TSink;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * Proxy IotDevice that uses publish-subscribe
+ * and IotDevicePubSub application to communicate
+ * with a single IotDevice connected to a message hub.
+ */
+class ProxyIotDevice implements IotDevice {
+
+ private IotDevicePubSub app;
+
+ private final Topology topology;
+
+ /**
+ * Create a proxy IotDevice
+ * @param app IotDevicePubSub application hosting the actual IotDevice.
+ * @param topology Topology of the subscribing application.
+ */
+ ProxyIotDevice(IotDevicePubSub app, Topology topology) {
+ this.app = app;
+ this.topology = topology;
+ }
+
+ @Override
+ public final Topology topology() {
+ return topology;
+ }
+
+ /**
+ * Publishes events derived from {@code stream} using the
+ * topic {@link IotDevicePubSub#EVENTS} as a JsonObject
+ * containing eventId, event, and qos keys.
+ */
+ @Override
+ public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+ UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
+
+ stream = stream.map(event ->
+ {
+ JsonObject publishedEvent = new JsonObject();
+
+ publishedEvent.addProperty("eventId", eventId.apply(event));
+ publishedEvent.add("event", payload.apply(event));
+ publishedEvent.addProperty("qos", qos.apply(event));
+
+ return publishedEvent;
+ });
+
+ return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS, JsonObject.class);
+ }
+
+ /**
+ * Publishes events derived from {@code stream} using the
+ * topic {@link IotDevicePubSub#EVENTS} as a JsonObject
+ * containing eventId, event, and qos keys.
+ */
+ @Override
+ public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
+
+ stream = stream.map(event ->
+ {
+ JsonObject publishedEvent = new JsonObject();
+
+ publishedEvent.addProperty("eventId", eventId);
+ publishedEvent.add("event", event);
+ publishedEvent.addProperty("qos", qos);
+
+ return publishedEvent;
+ });
+
+ return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS, JsonObject.class);
+ }
+
+ /**
+ * Subscribes to commands.
+ * Doesn't yet support subscribing to all commands.
+ */
+ @Override
+ public TStream<JsonObject> commands(String... commandIdentifiers) {
+
+ final String firstTopic = app.subscribeToCommand(commandIdentifiers[0]);
+ TStream<JsonObject> commandsStream = PublishSubscribe.subscribe(this, firstTopic, JsonObject.class);
+
+ if (commandIdentifiers.length > 1) {
+ Set<TStream<JsonObject>> additionalStreams = new HashSet<>();
+ for (int i = 1; i < commandIdentifiers.length; i++) {
+ String topic = app.subscribeToCommand(commandIdentifiers[i]);
+ additionalStreams.add(PublishSubscribe.subscribe(this, topic, JsonObject.class));
+ }
+ commandsStream = commandsStream.union(additionalStreams);
+ }
+
+ return commandsStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e2f81e9e/apps/iot/src/main/java/quarks/apps/iot/package-info.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/quarks/apps/iot/package-info.java b/apps/iot/src/main/java/quarks/apps/iot/package-info.java
new file mode 100644
index 0000000..d017412
--- /dev/null
+++ b/apps/iot/src/main/java/quarks/apps/iot/package-info.java
@@ -0,0 +1,23 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Applications for use in an Internet of Things environment.
+ */
+package quarks.apps.iot;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e2f81e9e/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java b/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
index e117ebe..c98673a 100644
--- a/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
+++ b/connectors/iot/src/main/java/quarks/connectors/iot/IotDevice.java
@@ -61,7 +61,7 @@ public interface IotDevice extends TopologyElement {
* on the stream. The JSON object has these keys:
* <UL>
* <LI>{@code command} - Command identifier as a String</LI>
- * <LI>{@code tsms} - IoTF Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI>
+ * <LI>{@code tsms} - Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI>
* <LI>{@code format} - Format of the command as a String</LI>
* <LI>{@code payload} - Payload of the command</LI>
* <UL>
@@ -71,7 +71,7 @@ public interface IotDevice extends TopologyElement {
* </UL>
*
*
- * @param commands Commands to include. If no commands are provided then the
+ * @param commands Command identifiers to include. If no command identifiers are provided then the
* stream will contain all device commands.
* @return Stream containing device commands.
*/