You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/17 18:13:18 UTC

[03/10] incubator-quarks git commit: Add initial IoT provider

Add initial IoT provider


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/27c4f0b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/27c4f0b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/27c4f0b0

Branch: refs/heads/master
Commit: 27c4f0b04f34b5e79f8d6b11a3159ebfba5c105d
Parents: cc6ee7b
Author: Dan Debrunner <dj...@debrunners.com>
Authored: Mon Mar 14 09:26:01 2016 -0700
Committer: Dan Debrunner <dj...@debrunners.com>
Committed: Wed Mar 16 15:17:22 2016 -0700

----------------------------------------------------------------------
 build.xml                                       |   1 +
 .../java/quarks/connectors/iot/Commands.java    |  24 +++
 providers/iot/.classpath                        |  15 ++
 providers/iot/.project                          |  17 ++
 providers/iot/build.xml                         |  31 ++++
 .../java/quarks/providers/iot/IotProvider.java  | 177 +++++++++++++++++++
 .../java/quarks/providers/iot/package-info.java |  25 +++
 test/.classpath                                 |   1 +
 test/fvtiot/build.xml                           |   3 +-
 .../java/quarks/test/fvt/iot/IotAppService.java |  23 ++-
 .../quarks/test/fvt/iot/IotProviderTest.java    |  67 +++++++
 11 files changed, 374 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index f311b40..641befd 100644
--- a/build.xml
+++ b/build.xml
@@ -116,6 +116,7 @@
         <ant dir="console/servlets" target="@{target}" useNativeBasedir="true" />
         <ant dir="apps/iot" target="@{target}" useNativeBasedir="true" />
         <ant dir="providers/development" target="@{target}" useNativeBasedir="true"/>
+        <ant dir="providers/iot" target="@{target}" useNativeBasedir="true"/>
         <ant dir="analytics/math3" target="@{target}" useNativeBasedir="true"/>
         <ant dir="analytics/sensors" target="@{target}" useNativeBasedir="true"/>
         <ant dir="samples/utils" target="@{target}" useNativeBasedir="true"/>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/connectors/iot/src/main/java/quarks/connectors/iot/Commands.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/quarks/connectors/iot/Commands.java b/connectors/iot/src/main/java/quarks/connectors/iot/Commands.java
new file mode 100644
index 0000000..70e3849
--- /dev/null
+++ b/connectors/iot/src/main/java/quarks/connectors/iot/Commands.java
@@ -0,0 +1,24 @@
+package quarks.connectors.iot;
+
+/**
+ * Command identifiers used by Quarks.
+ * 
+ * @see IotDevice#RESERVED_ID_PREFIX
+ */
+public interface Commands {
+    
+    /**
+     * Command identifier used for the control service.
+     * <BR>
+     * The command payload is used to invoke operations
+     * against control MBeans using an instance of
+     * {@link quarks.runtime.jsoncontrol.JsonControlService}.
+     * <BR>
+     * Value is {@value}.
+     * 
+     * @see quarks.execution.services.ControlService
+     * @see quarks.providers.iot.IotProvider
+     */
+    String CONTROL_SERVICE = IotDevice.RESERVED_ID_PREFIX + "Control";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/providers/iot/.classpath
----------------------------------------------------------------------
diff --git a/providers/iot/.classpath b/providers/iot/.classpath
new file mode 100644
index 0000000..bd17cf8
--- /dev/null
+++ b/providers/iot/.classpath
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" path="src/main/java"/>
+	<classpathentry kind="src" path="src/test/java"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/connectors"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/providers"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/api"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/ext"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/spi"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/runtime"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/apps"/>
+	<classpathentry kind="output" path="bin"/>
+</classpath>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/providers/iot/.project
----------------------------------------------------------------------
diff --git a/providers/iot/.project b/providers/iot/.project
new file mode 100644
index 0000000..1d8b2b3
--- /dev/null
+++ b/providers/iot/.project
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>provider_iot</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+	</natures>
+</projectDescription>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/providers/iot/build.xml
----------------------------------------------------------------------
diff --git a/providers/iot/build.xml b/providers/iot/build.xml
new file mode 100644
index 0000000..1711636
--- /dev/null
+++ b/providers/iot/build.xml
@@ -0,0 +1,31 @@
+<project name="quarks.providers.iot" default="all" 
+    xmlns:jacoco="antlib:org.jacoco.ant"
+    >
+    <description>
+        Build the iot topology provider.
+    </description>
+
+  <import file="../../common-build.xml"/>
+
+  <path id="compile.classpath">
+    <pathelement location="${quarks.lib}/quarks.providers.direct.jar" />
+    <pathelement location="${quarks.lib}/quarks.runtime.jsoncontrol.jar" />
+    <pathelement location="${quarks.lib}/quarks.runtime.appservice.jar" />
+    <pathelement location="${quarks.connectors}/iot/lib/quarks.connectors.iot.jar" />
+    <pathelement location="${quarks.connectors}/pubsub/lib/quarks.connectors.pubsub.jar" />
+    <pathelement location="${quarks.apps}/iot/lib/quarks.apps.iot.jar" />
+  </path>
+
+  <path id="test.compile.classpath">
+    <pathelement location="${jar}" />
+    <pathelement location="../../api/topology/test.classes" />
+    <path refid="compile.classpath"/>
+  </path>
+
+  <path id="test.classpath">
+    <pathelement location="${test.classes}" />
+    <path refid="test.compile.classpath"/>
+    <path refid="test.common.classpath" />
+  </path>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/providers/iot/src/main/java/quarks/providers/iot/IotProvider.java
----------------------------------------------------------------------
diff --git a/providers/iot/src/main/java/quarks/providers/iot/IotProvider.java b/providers/iot/src/main/java/quarks/providers/iot/IotProvider.java
new file mode 100644
index 0000000..89d1604
--- /dev/null
+++ b/providers/iot/src/main/java/quarks/providers/iot/IotProvider.java
@@ -0,0 +1,177 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.providers.iot;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.gson.JsonObject;
+
+import quarks.apps.iot.IotDevicePubSub;
+import quarks.connectors.iot.Commands;
+import quarks.connectors.iot.IotDevice;
+import quarks.connectors.pubsub.service.ProviderPubSub;
+import quarks.connectors.pubsub.service.PublishSubscribeService;
+import quarks.execution.Configs;
+import quarks.execution.DirectSubmitter;
+import quarks.execution.Job;
+import quarks.execution.services.ControlService;
+import quarks.execution.services.ServiceContainer;
+import quarks.providers.direct.DirectProvider;
+import quarks.runtime.appservice.AppService;
+import quarks.runtime.jsoncontrol.JsonControlService;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.TopologyProvider;
+import quarks.topology.services.ApplicationService;
+
+/**
+ * IoT provider supporting multiple topologies with a single connection to a
+ * message hub. A provider that uses a single {@link IotDevice} to communicate
+ * with an IoT scale message hub.
+ * {@link quarks.connectors.pubsub.PublishSubscribe Publish-subscribe} is
+ * used to allow multiple topologies to communicate through the single
+ * connection.
+ * 
+ */
+public abstract class IotProvider implements TopologyProvider,
+ DirectSubmitter<Topology, Job> {
+    
+    private final TopologyProvider provider;
+    private final DirectSubmitter<Topology, Job> submitter;
+    
+    private final List<Topology> systemApps = new ArrayList<>();
+
+    private JsonControlService controlService = new JsonControlService();
+    
+    protected IotProvider() {   
+        this(new DirectProvider());
+    }
+    
+    protected IotProvider(DirectProvider provider) {
+        this(provider, provider);
+    }
+
+    protected IotProvider(TopologyProvider provider, DirectSubmitter<Topology, Job> submitter) {
+        this.provider = provider;
+        this.submitter = submitter;
+        
+        registerControlService();
+        registerApplicationService();
+        registerPublishSubscribeService();
+        
+        createIotDeviceApp();
+        createIotCommandToControlApp();
+    }
+    
+    public ApplicationService getApplicationService() {
+        return getServices().getService(ApplicationService.class);
+    }
+    
+    @Override
+    public ServiceContainer getServices() {
+        return submitter.getServices();
+    }
+    
+    @Override
+    public final Topology newTopology() {
+        return provider.newTopology();
+    }
+    @Override
+    public final Topology newTopology(String name) {
+        return provider.newTopology(name);
+    }
+    @Override
+    public final Future<Job> submit(Topology topology) {
+        return submitter.submit(topology);
+    }
+    @Override
+    public final Future<Job> submit(Topology topology, JsonObject config) {
+        return submitter.submit(topology, config);
+    }
+
+    protected void registerControlService() {
+        getServices().addService(ControlService.class, getControlService());
+    }
+
+    protected void registerApplicationService() {
+        AppService.createAndRegister(this, this);
+    }
+    protected void registerPublishSubscribeService() {
+        getServices().addService(PublishSubscribeService.class, 
+                new ProviderPubSub());
+    }
+
+    protected JsonControlService getControlService() {
+        return controlService;
+    }
+    
+    /**
+     * Create application that connects to the message hub.
+     * Subscribes to device events and sends them to the messages hub.
+     * Publishes device commands from the message hub.
+     * @see IotDevicePubSub
+     * @see #getMessageHubDevice(Topology)
+     */
+    protected void createIotDeviceApp() {
+        Topology topology = newTopology("QuarksIotDevice");
+             
+        IotDevice msgHub = getMessageHubDevice(topology);
+        IotDevicePubSub.createApplication(msgHub);
+        systemApps.add(topology);
+    }
+    
+    /**
+     * Create application connects {@code quarksControl} device commands
+     * to the control service.
+     * 
+     * Subscribes to device
+     * commands of type {@link Commands#CONTROL_SERVICE}
+     * and sends the payload into the JSON control service
+     * to invoke the control operation.
+     */
+    protected void createIotCommandToControlApp() {
+        Topology topology = newTopology("QuarksIotCommandsToControl");
+        
+        IotDevice publishedDevice = IotDevicePubSub.addIotDevice(topology);
+
+        TStream<JsonObject> controlCommands = publishedDevice.commands(Commands.CONTROL_SERVICE);
+        controlCommands.sink(cmd -> {
+            try {
+                getControlService().controlRequest(cmd.getAsJsonObject(IotDevice.CMD_PAYLOAD));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+        
+        systemApps.add(topology);
+    }
+    
+    public void start() throws InterruptedException, ExecutionException {
+        for (Topology topology : systemApps) {
+            JsonObject config = new JsonObject();
+            config.addProperty(Configs.JOB_NAME, topology.getName());
+            submit(topology, config).get();
+        }
+    }
+
+    protected abstract IotDevice getMessageHubDevice(Topology topology);
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/providers/iot/src/main/java/quarks/providers/iot/package-info.java
----------------------------------------------------------------------
diff --git a/providers/iot/src/main/java/quarks/providers/iot/package-info.java b/providers/iot/src/main/java/quarks/providers/iot/package-info.java
new file mode 100644
index 0000000..5044636
--- /dev/null
+++ b/providers/iot/src/main/java/quarks/providers/iot/package-info.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Iot provider that allows multiple applications to
+ * share an {@code IotDevice}.
+ */
+package quarks.providers.iot;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/test/.classpath
----------------------------------------------------------------------
diff --git a/test/.classpath b/test/.classpath
index 0b81af1..6c4d302 100644
--- a/test/.classpath
+++ b/test/.classpath
@@ -14,5 +14,6 @@
 	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
 	<classpathentry combineaccessrules="false" kind="src" path="/apps"/>
 	<classpathentry combineaccessrules="false" kind="src" path="/runtime"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/provider_iot"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/test/fvtiot/build.xml
----------------------------------------------------------------------
diff --git a/test/fvtiot/build.xml b/test/fvtiot/build.xml
index a38ac29..2b00468 100644
--- a/test/fvtiot/build.xml
+++ b/test/fvtiot/build.xml
@@ -7,7 +7,7 @@
   <import file="../../common-build.xml"/>
 
   <path id="compile.classpath">
-    <pathelement location="${quarks.lib}/quarks.providers.direct.jar"/>
+    <pathelement location="${quarks.lib}/quarks.providers.iot.jar"/>
     <pathelement location="${quarks.lib}/quarks.runtime.appservice.jar"/>
     <pathelement location="${quarks.lib}/quarks.runtime.jsoncontrol.jar"/>
     <path refid="quarks.ext.classpath"/>
@@ -15,6 +15,7 @@
 
   <path id="test.compile.classpath">
     <pathelement location="${jar}" />
+    <pathelement location="../../apps/iot/test.classes" />
     <path refid="compile.classpath"/>
   </path>
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotAppService.java
----------------------------------------------------------------------
diff --git a/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotAppService.java b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotAppService.java
index 8b8c14e..12e3b81 100644
--- a/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotAppService.java
+++ b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotAppService.java
@@ -49,21 +49,26 @@ public class IotAppService {
         
         apps.registerTopology("AppOne", IotAppService::createApplicationOne);
         
-
-        JsonObject submitAppOne = new JsonObject();   
-        submitAppOne.addProperty(JsonControlService.TYPE_KEY, ApplicationServiceMXBean.TYPE);
-        submitAppOne.addProperty(JsonControlService.ALIAS_KEY, ApplicationService.ALIAS);
-        JsonArray args = new JsonArray();
-        args.add(new JsonPrimitive("AppOne"));
-        args.add(new JsonObject());
-        submitAppOne.addProperty(JsonControlService.OP_KEY, "submit");
-        submitAppOne.add(JsonControlService.ARGS_KEY, args);
+        JsonObject submitAppOne = newSubmitRequest("AppOne");
         
         JsonElement crr = control.controlRequest(submitAppOne);
         
         assertTrue(crr.getAsBoolean());
     }
     
+    public static JsonObject newSubmitRequest(String name) {
+        JsonObject submitApp = new JsonObject();   
+        submitApp.addProperty(JsonControlService.TYPE_KEY, ApplicationServiceMXBean.TYPE);
+        submitApp.addProperty(JsonControlService.ALIAS_KEY, ApplicationService.ALIAS);
+        JsonArray args = new JsonArray();
+        args.add(new JsonPrimitive(name));
+        args.add(new JsonObject());
+        submitApp.addProperty(JsonControlService.OP_KEY, "submit");
+        submitApp.add(JsonControlService.ARGS_KEY, args); 
+        
+        return submitApp;
+    }
+    
     public static void createApplicationOne(Topology topology, JsonObject config) {
         topology.strings("A", "B", "C").print();
     }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/27c4f0b0/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
----------------------------------------------------------------------
diff --git a/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
new file mode 100644
index 0000000..c84c3ae
--- /dev/null
+++ b/test/fvtiot/src/test/java/quarks/test/fvt/iot/IotProviderTest.java
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.test.fvt.iot;
+
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+import quarks.apps.iot.IotDevicePubSub;
+import quarks.connectors.iot.Commands;
+import quarks.connectors.iot.IotDevice;
+import quarks.providers.iot.IotProvider;
+import quarks.test.apps.iot.EchoIotDevice;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+public class IotProviderTest {
+    
+    @Test
+    public void testIotProvider() throws Exception {
+        
+        IotProvider provider = new IotProvider() {
+
+            @Override
+            protected IotDevice getMessageHubDevice(Topology topology) {
+                return new EchoIotDevice(topology);
+            }};
+        
+            provider.start();
+            
+            provider.getApplicationService().registerTopology("AppOne", IotAppService::createApplicationOne);
+
+            //
+            JsonObject submitAppOne = IotAppService.newSubmitRequest("AppOne");
+            
+            
+            Topology submitter = provider.newTopology();
+            TStream<JsonObject> cmds = submitter.of(submitAppOne);
+            IotDevice publishedDevice = IotDevicePubSub.addIotDevice(submitter);
+            
+            publishedDevice.events(cmds, Commands.CONTROL_SERVICE, 0);
+            
+            provider.submit(submitter).get();
+    }
+    
+    
+    
+    public static void createApplicationOne(Topology topology, JsonObject config) {
+        topology.strings("A", "B", "C").print();
+    }
+}