You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/17 18:13:18 UTC
[03/10] incubator-quarks git commit: Add initial IoT provider
Add initial IoT provider
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/27c4f0b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/27c4f0b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/27c4f0b0
Branch: refs/heads/master
Commit: 27c4f0b04f34b5e79f8d6b11a3159ebfba5c105d
Parents: cc6ee7b
Author: Dan Debrunner <dj...@debrunners.com>
Authored: Mon Mar 14 09:26:01 2016 -0700
Committer: Dan Debrunner <dj...@debrunners.com>
Committed: Wed Mar 16 15:17:22 2016 -0700
----------------------------------------------------------------------
build.xml | 1 +
.../java/quarks/connectors/iot/Commands.java | 24 +++
providers/iot/.classpath | 15 ++
providers/iot/.project | 17 ++
providers/iot/build.xml | 31 ++++
.../java/quarks/providers/iot/IotProvider.java | 177 +++++++++++++++++++
.../java/quarks/providers/iot/package-info.java | 25 +++
test/.classpath | 1 +
test/fvtiot/build.xml | 3 +-
.../java/quarks/test/fvt/iot/IotAppService.java | 23 ++-
.../quarks/test/fvt/iot/IotProviderTest.java | 67 +++++++
11 files changed, 374 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index f311b40..641befd 100644
--- a/build.xml
+++ b/build.xml
@@ -116,6 +116,7 @@
<ant dir="console/servlets" target="@{target}" useNativeBasedir="true" />
<ant dir="apps/iot" target="@{target}" useNativeBasedir="true" />
<ant dir="providers/development" target="@{target}" useNativeBasedir="true"/>
+ <ant dir="providers/iot" target="@{target}" useNativeBasedir="true"/>
<ant dir="analytics/math3" target="@{target}" useNativeBasedir="true"/>
<ant dir="analytics/sensors" target="@{target}" useNativeBasedir="true"/>
<ant dir="samples/utils" target="@{target}" useNativeBasedir="true"/>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/connectors/iot/src/main/java/quarks/connectors/iot/Commands.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/quarks/connectors/iot/Commands.java b/connectors/iot/src/main/java/quarks/connectors/iot/Commands.java
new file mode 100644
index 0000000..70e3849
--- /dev/null
+++ b/connectors/iot/src/main/java/quarks/connectors/iot/Commands.java
@@ -0,0 +1,24 @@
+package quarks.connectors.iot;
+
+/**
+ * Command identifiers used by Quarks.
+ *
+ * @see IotDevice#RESERVED_ID_PREFIX
+ */
+public interface Commands {
+
+ /**
+ * Command identifier used for the control service.
+ * <BR>
+ * The command payload is used to invoke operations
+ * against control MBeans using an instance of
+ * {@link quarks.runtime.jsoncontrol.JsonControlService}.
+ * <BR>
+ * Value is {@value}.
+ *
+ * @see quarks.execution.services.ControlService
+ * @see quarks.providers.iot.IotProvider
+ */
+ String CONTROL_SERVICE = IotDevice.RESERVED_ID_PREFIX + "Control";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/providers/iot/.classpath
----------------------------------------------------------------------
diff --git a/providers/iot/.classpath b/providers/iot/.classpath
new file mode 100644
index 0000000..bd17cf8
--- /dev/null
+++ b/providers/iot/.classpath
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src/main/java"/>
+ <classpathentry kind="src" path="src/test/java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/connectors"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/providers"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/api"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/ext"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/spi"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/runtime"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/apps"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/providers/iot/.project
----------------------------------------------------------------------
diff --git a/providers/iot/.project b/providers/iot/.project
new file mode 100644
index 0000000..1d8b2b3
--- /dev/null
+++ b/providers/iot/.project
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>provider_iot</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/providers/iot/build.xml
----------------------------------------------------------------------
diff --git a/providers/iot/build.xml b/providers/iot/build.xml
new file mode 100644
index 0000000..1711636
--- /dev/null
+++ b/providers/iot/build.xml
@@ -0,0 +1,31 @@
+<project name="quarks.providers.iot" default="all"
+ xmlns:jacoco="antlib:org.jacoco.ant"
+ >
+ <description>
+ Build the iot topology provider.
+ </description>
+
+ <import file="../../common-build.xml"/>
+
+ <path id="compile.classpath">
+ <pathelement location="${quarks.lib}/quarks.providers.direct.jar" />
+ <pathelement location="${quarks.lib}/quarks.runtime.jsoncontrol.jar" />
+ <pathelement location="${quarks.lib}/quarks.runtime.appservice.jar" />
+ <pathelement location="${quarks.connectors}/iot/lib/quarks.connectors.iot.jar" />
+ <pathelement location="${quarks.connectors}/pubsub/lib/quarks.connectors.pubsub.jar" />
+ <pathelement location="${quarks.apps}/iot/lib/quarks.apps.iot.jar" />
+ </path>
+
+ <path id="test.compile.classpath">
+ <pathelement location="${jar}" />
+ <pathelement location="../../api/topology/test.classes" />
+ <path refid="compile.classpath"/>
+ </path>
+
+ <path id="test.classpath">
+ <pathelement location="${test.classes}" />
+ <path refid="test.compile.classpath"/>
+ <path refid="test.common.classpath" />
+ </path>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/providers/iot/src/main/java/quarks/providers/iot/IotProvider.java
----------------------------------------------------------------------
diff --git a/providers/iot/src/main/java/quarks/providers/iot/IotProvider.java b/providers/iot/src/main/java/quarks/providers/iot/IotProvider.java
new file mode 100644
index 0000000..89d1604
--- /dev/null
+++ b/providers/iot/src/main/java/quarks/providers/iot/IotProvider.java
@@ -0,0 +1,177 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.providers.iot;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.gson.JsonObject;
+
+import quarks.apps.iot.IotDevicePubSub;
+import quarks.connectors.iot.Commands;
+import quarks.connectors.iot.IotDevice;
+import quarks.connectors.pubsub.service.ProviderPubSub;
+import quarks.connectors.pubsub.service.PublishSubscribeService;
+import quarks.execution.Configs;
+import quarks.execution.DirectSubmitter;
+import quarks.execution.Job;
+import quarks.execution.services.ControlService;
+import quarks.execution.services.ServiceContainer;
+import quarks.providers.direct.DirectProvider;
+import quarks.runtime.appservice.AppService;
+import quarks.runtime.jsoncontrol.JsonControlService;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.TopologyProvider;
+import quarks.topology.services.ApplicationService;
+
+/**
+ * IoT provider supporting multiple topologies with a single connection to a
+ * message hub. A provider that uses a single {@link IotDevice} to communicate
+ * with an IoT scale message hub.
+ * {@link quarks.connectors.pubsub.PublishSubscribe Publish-subscribe} is
+ * used to allow multiple topologies to communicate through the single
+ * connection.
+ *
+ */
+public abstract class IotProvider implements TopologyProvider,
+ DirectSubmitter<Topology, Job> {
+
+ private final TopologyProvider provider;
+ private final DirectSubmitter<Topology, Job> submitter;
+
+ private final List<Topology> systemApps = new ArrayList<>();
+
+ private JsonControlService controlService = new JsonControlService();
+
+ protected IotProvider() {
+ this(new DirectProvider());
+ }
+
+ protected IotProvider(DirectProvider provider) {
+ this(provider, provider);
+ }
+
+ protected IotProvider(TopologyProvider provider, DirectSubmitter<Topology, Job> submitter) {
+ this.provider = provider;
+ this.submitter = submitter;
+
+ registerControlService();
+ registerApplicationService();
+ registerPublishSubscribeService();
+
+ createIotDeviceApp();
+ createIotCommandToControlApp();
+ }
+
+ public ApplicationService getApplicationService() {
+ return getServices().getService(ApplicationService.class);
+ }
+
+ @Override
+ public ServiceContainer getServices() {
+ return submitter.getServices();
+ }
+
+ @Override
+ public final Topology newTopology() {
+ return provider.newTopology();
+ }
+ @Override
+ public final Topology newTopology(String name) {
+ return provider.newTopology(name);
+ }
+ @Override
+ public final Future<Job> submit(Topology topology) {
+ return submitter.submit(topology);
+ }
+ @Override
+ public final Future<Job> submit(Topology topology, JsonObject config) {
+ return submitter.submit(topology, config);
+ }
+
+ protected void registerControlService() {
+ getServices().addService(ControlService.class, getControlService());
+ }
+
+ protected void registerApplicationService() {
+ AppService.createAndRegister(this, this);
+ }
+ protected void registerPublishSubscribeService() {
+ getServices().addService(PublishSubscribeService.class,
+ new ProviderPubSub());
+ }
+
+ protected JsonControlService getControlService() {
+ return controlService;
+ }
+
+ /**
+ * Create application that connects to the message hub.
+ * Subscribes to device events and sends them to the messages hub.
+ * Publishes device commands from the message hub.
+ * @see IotDevicePubSub
+ * @see #getMessageHubDevice(Topology)
+ */
+ protected void createIotDeviceApp() {
+ Topology topology = newTopology("QuarksIotDevice");
+
+ IotDevice msgHub = getMessageHubDevice(topology);
+ IotDevicePubSub.createApplication(msgHub);
+ systemApps.add(topology);
+ }
+
+ /**
+ * Create application connects {@code quarksControl} device commands
+ * to the control service.
+ *
+ * Subscribes to device
+ * commands of type {@link Commands#CONTROL_SERVICE}
+ * and sends the payload into the JSON control service
+ * to invoke the control operation.
+ */
+ protected void createIotCommandToControlApp() {
+ Topology topology = newTopology("QuarksIotCommandsToControl");
+
+ IotDevice publishedDevice = IotDevicePubSub.addIotDevice(topology);
+
+ TStream<JsonObject> controlCommands = publishedDevice.commands(Commands.CONTROL_SERVICE);
+ controlCommands.sink(cmd -> {
+ try {
+ getControlService().controlRequest(cmd.getAsJsonObject(IotDevice.CMD_PAYLOAD));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ systemApps.add(topology);
+ }
+
+ public void start() throws InterruptedException, ExecutionException {
+ for (Topology topology : systemApps) {
+ JsonObject config = new JsonObject();
+ config.addProperty(Configs.JOB_NAME, topology.getName());
+ submit(topology, config).get();
+ }
+ }
+
+ protected abstract IotDevice getMessageHubDevice(Topology topology);
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/providers/iot/src/main/java/quarks/providers/iot/package-info.java
----------------------------------------------------------------------
diff --git a/providers/iot/src/main/java/quarks/providers/iot/package-info.java b/providers/iot/src/main/java/quarks/providers/iot/package-info.java
new file mode 100644
index 0000000..5044636
--- /dev/null
+++ b/providers/iot/src/main/java/quarks/providers/iot/package-info.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Iot provider that allows multiple applications to
+ * share an {@code IotDevice}.
+ */
+package quarks.providers.iot;
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/test/.classpath
----------------------------------------------------------------------
diff --git a/test/.classpath b/test/.classpath
index 0b81af1..6c4d302 100644
--- a/test/.classpath
+++ b/test/.classpath
@@ -14,5 +14,6 @@
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry combineaccessrules="false" kind="src" path="/apps"/>
<classpathentry combineaccessrules="false" kind="src" path="/runtime"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/provider_iot"/>
<classpathentry kind="output" path="bin"/>
</classpath>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/test/fvtiot/build.xml
----------------------------------------------------------------------
diff --git a/test/fvtiot/build.xml b/test/fvtiot/build.xml
index a38ac29..2b00468 100644
--- a/test/fvtiot/build.xml
+++ b/test/fvtiot/build.xml
@@ -7,7 +7,7 @@
<import file="../../common-build.xml"/>
<path id="compile.classpath">
- <pathelement location="${quarks.lib}/quarks.providers.direct.jar"/>
+ <pathelement location="${quarks.lib}/quarks.providers.iot.jar"/>
<pathelement location="${quarks.lib}/quarks.runtime.appservice.jar"/>
<pathelement location="${quarks.lib}/quarks.runtime.jsoncontrol.jar"/>
<path refid="quarks.ext.classpath"/>
@@ -15,6 +15,7 @@
<path id="test.compile.classpath">
<pathelement location="${jar}" />
+ <pathelement location="../../apps/iot/test.classes" />
<path refid="compile.classpath"/>
</path>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotAppService.java
----------------------------------------------------------------------
diff --git a/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotAppService.java b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotAppService.java
index 8b8c14e..12e3b81 100644
--- a/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotAppService.java
+++ b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotAppService.java
@@ -49,21 +49,26 @@ public class IotAppService {
apps.registerTopology("AppOne", IotAppService::createApplicationOne);
-
- JsonObject submitAppOne = new JsonObject();
- submitAppOne.addProperty(JsonControlService.TYPE_KEY, ApplicationServiceMXBean.TYPE);
- submitAppOne.addProperty(JsonControlService.ALIAS_KEY, ApplicationService.ALIAS);
- JsonArray args = new JsonArray();
- args.add(new JsonPrimitive("AppOne"));
- args.add(new JsonObject());
- submitAppOne.addProperty(JsonControlService.OP_KEY, "submit");
- submitAppOne.add(JsonControlService.ARGS_KEY, args);
+ JsonObject submitAppOne = newSubmitRequest("AppOne");
JsonElement crr = control.controlRequest(submitAppOne);
assertTrue(crr.getAsBoolean());
}
+ public static JsonObject newSubmitRequest(String name) {
+ JsonObject submitApp = new JsonObject();
+ submitApp.addProperty(JsonControlService.TYPE_KEY, ApplicationServiceMXBean.TYPE);
+ submitApp.addProperty(JsonControlService.ALIAS_KEY, ApplicationService.ALIAS);
+ JsonArray args = new JsonArray();
+ args.add(new JsonPrimitive(name));
+ args.add(new JsonObject());
+ submitApp.addProperty(JsonControlService.OP_KEY, "submit");
+ submitApp.add(JsonControlService.ARGS_KEY, args);
+
+ return submitApp;
+ }
+
public static void createApplicationOne(Topology topology, JsonObject config) {
topology.strings("A", "B", "C").print();
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
----------------------------------------------------------------------
diff --git a/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
new file mode 100644
index 0000000..c84c3ae
--- /dev/null
+++ b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.test.fvt.iot;
+
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+import quarks.apps.iot.IotDevicePubSub;
+import quarks.connectors.iot.Commands;
+import quarks.connectors.iot.IotDevice;
+import quarks.providers.iot.IotProvider;
+import quarks.test.apps.iot.EchoIotDevice;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+public class IotProviderTest {
+
+ @Test
+ public void testIotProvider() throws Exception {
+
+ IotProvider provider = new IotProvider() {
+
+ @Override
+ protected IotDevice getMessageHubDevice(Topology topology) {
+ return new EchoIotDevice(topology);
+ }};
+
+ provider.start();
+
+ provider.getApplicationService().registerTopology("AppOne", IotAppService::createApplicationOne);
+
+ //
+ JsonObject submitAppOne = IotAppService.newSubmitRequest("AppOne");
+
+
+ Topology submitter = provider.newTopology();
+ TStream<JsonObject> cmds = submitter.of(submitAppOne);
+ IotDevice publishedDevice = IotDevicePubSub.addIotDevice(submitter);
+
+ publishedDevice.events(cmds, Commands.CONTROL_SERVICE, 0);
+
+ provider.submit(submitter).get();
+ }
+
+
+
+ public static void createApplicationOne(Topology topology, JsonObject config) {
+ topology.strings("A", "B", "C").print();
+ }
+}