You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:08 UTC
[07/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/applicationTemplate.properties
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/applicationTemplate.properties b/samples/apps/src/main/java/edgent/samples/apps/applicationTemplate.properties
deleted file mode 100644
index c96ad00..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/applicationTemplate.properties
+++ /dev/null
@@ -1,81 +0,0 @@
-# A template file for Application Configuration properties
-#
-# The default Edgent topology provider is DirectProvider
-#topology.provider=edgent.providers.development.DevelopmentProvider
-#
-application.name=MyAnalytics
-#
-
-# =========================================================================
-# Application stream logging configuration
-# Where the app puts its stream logs.
-# The directory will be created when the topology
-# runs if it doesn't already exist.
-application.log.dir=/tmp/MyAnalytics/logs
-
-# =========================================================================
-# Application "ranges" - e.g., for threshold detections
-# Specify values generated by Range.toString():
-# <lowerBoundType><lowerBound>..<upperBound><upperBoundType>
-# where
-# lowerBoundType is "[" inclusive or "(" exclusive
-# upperBoundType is "]" inclusive or ")" exclusive
-# lowerBound or upperBound is "*" for open ranges,
-# e.g., [*..50] for "atMost" 50
-#
-sensor1.range.outside1hzMeanRange=[124..129]
-
-# =========================================================================
-# MQTT Device and Connector configuration info.
-#
-# MQTT Device -- See edgent.connectors.mqtt.device.MqttDevice for all
-# of the properties.
-#
-# An optional topic prefix. It can be used to isolate users or applications
-# in shared MQTT broker configurations. By default it is incorporated
-# into device topics and the MQTT clientId.
-# If you use a public MQTT broker you may want to change the topic
-# prefix so it is still unique for you but doesn't include the
-# user name or application name.
-mqttDevice.topic.prefix=ibm.xyzzy-streams.samples/user/{user.name}/{application.name}/
-#
-# The device id used for identifying the device's events and commands
-# in the MQTT topic namespace.
-# By default it also gets incorporated into the MQTT clientId value.
-mqttDevice.id=012345
-#
-# The MQTT clientId. Only one instance of a MqttDevice can connect
-# to the MQTT broker with a given clientId.
-#mqttDevice.mqtt.clientId={mqttDevice.topic.prefix}id/{mqttDevice.id}
-#
-# MQTT Connector See edgent.connectors.mqtt.MqttConfig.fromProperties()
-#
-# The default configuration is for a local MQTT broker.
-# See mosquitto.org for instructions on downloading a MQTT broker.
-# Or use some other MQTT broker available in your environment.
-mqtt.serverURLs=tcp://localhost:1883
-#
-# Alternatively, there are some public MQTT brokers available to experiment with.
-# Their availability status isn't guaranteed. If you're unable to connect
-# to the broker, it's likely that it isn't up or your firewalls don't
-# allow you to connect. DO NOT PUBLISH ANYTHING SENSITIVE - anyone
-# can be listing.
-#mqtt.serverURLs=tcp://iot.eclipse.org:1883
-#mqtt.serverURLs=tcp://test.mosquitto.org:1883
-#
-#mqtt.userName=xyzzy
-#mqtt.password=myMosquittoPw
-
-# =========================================================================
-# Patterns for identifying which streams to trace to System.out
-# To enable use include.csv and/or includes.regex.
-# To exclude an otherwise included file, use excludes.csv and/or excludes.regex
-#
-# Some tracing labels
-# sensor1.raw1khz,sensor1.j1khz,sensor1.j1hzStats,sensor1.outside1hzMeanRange*,
-# sensor1.periodicLastN*
-#
-#stream.tracing.includes.csv=sensor1.j1hzStats
-stream.tracing.includes.regex=sensor1.outside1hzMeanRange.*
-#stream.tracing.excludes.regex=.*
-#stream.tracing.excludes.csv=sensor1.raw1khz
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/mqtt/AbstractMqttApplication.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/mqtt/AbstractMqttApplication.java b/samples/apps/src/main/java/edgent/samples/apps/mqtt/AbstractMqttApplication.java
deleted file mode 100644
index 46fa96d..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/mqtt/AbstractMqttApplication.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.apps.mqtt;
-
-import static edgent.connectors.iot.IotDevice.CMD_PAYLOAD;
-
-import java.util.Arrays;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.mqtt.iot.MqttDevice;
-import edgent.samples.apps.AbstractApplication;
-import edgent.samples.apps.ApplicationUtilities;
-import edgent.samples.apps.TopologyProviderFactory;
-import edgent.topology.Topology;
-
-/**
- * An MQTT Application base class.
- * <p>
- * Application instances need to:
- * <ul>
- * <li>define an implementation for {@link #buildTopology(Topology)}</li>
- * <li>call {@link #run()} to build and submit the topology for execution.</li>
- * </ul>
- * <p>
- * The class provides some common processing needs:
- * <ul>
- * <li>Support for an external configuration file</li>
- * <li>Provides a {@link TopologyProviderFactory}</li>
- * <li>Provides a {@link ApplicationUtilities}</li>
- * <li>Provides a {@link MqttDevice}</li>
- * </ul>
- */
-public abstract class AbstractMqttApplication extends AbstractApplication {
-
- private MqttDevice mqttDevice;
-
- public AbstractMqttApplication(String propsPath) throws Exception {
- super(propsPath);
- }
-
- @Override
- protected void preBuildTopology(Topology t) {
- // Add an MQTT device communication manager to the topology
- updateTopicPrefix();
- mqttDevice = new MqttDevice(t, props);
- System.out.println("MqttDevice serverURLs " + Arrays.toString(mqttDevice.getMqttConfig().getServerURLs()));
- System.out.println("MqttDevice clientId " + mqttDevice.getMqttConfig().getClientId());
- System.out.println("MqttDevice deviceId " + props.getProperty("mqttDevice.id"));
- System.out.println("MqttDevice event topic pattern " + mqttDevice.eventTopic(null));
- System.out.println("MqttDevice command topic pattern " + mqttDevice.commandTopic(null));
- }
-
- /**
- * Get the application's MqttDevice
- * @return the MqttDevice
- */
- public MqttDevice mqttDevice() {
- return mqttDevice;
- }
-
- private void updateTopicPrefix() {
- String val = props.getProperty("mqttDevice.topic.prefix");
- if (val != null) {
- val = val.replace("{user.name}", System.getProperty("user.name"));
- val = val.replace("{application.name}", props.getProperty("application.name"));
- props.setProperty("mqttDevice.topic.prefix", val);
- }
- }
-
- /**
- * Compose a MqttDevice eventId for the sensor.
- * @param sensorId the sensor id
- * @param eventId the sensor's eventId
- * @return the device eventId
- */
- public String sensorEventId(String sensorId, String eventId) {
- return sensorId + "." + eventId;
- }
-
- /**
- * Compose a MqttDevice commandId for the sensor
- * @param sensorId the sensor id
- * @param commandId the sensor's commandId
- * @return the device commandId
- */
- public String commandId(String sensorId, String commandId) {
- return sensorId + "." + commandId;
- }
-
- /**
- * Extract a simple string valued command arg
- * from a {@link MqttDevice#commands(String...)} returned
- * JsonObject tuple.
- * <p>
- * Interpret the JsonObject's embedded payload as a JsonObject with a single
- * "value" property.
- * @param jo the command tuple.
- * @return the command's argument value
- */
- public String getCommandValueString(JsonObject jo) {
- return jo.get(CMD_PAYLOAD).getAsJsonObject().get("value").getAsString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/mqtt/DeviceCommsApp.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/mqtt/DeviceCommsApp.java b/samples/apps/src/main/java/edgent/samples/apps/mqtt/DeviceCommsApp.java
deleted file mode 100644
index e399eca..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/mqtt/DeviceCommsApp.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.apps.mqtt;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.iot.QoS;
-import edgent.connectors.mqtt.MqttStreams;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.json.JsonFunctions;
-
-/**
- * An MQTT Device Communications client for watching device events
- * and sending commands.
- * <p>
- * This is an "application properties" aware client that gets MQTT configuration
- * from an Edgent sample app application configuration properties file.
- * <p>
- * This client avoids the need for other MQTT clients (e.g., from a mosquitto
- * installation) to observe and control the applications.
- */
-public class DeviceCommsApp extends AbstractMqttApplication {
-
- private static final String usage = "Usage: watch | send <cmdLabel> <cmdArg>";
-
- private String mode;
- private String cmdLabel;
- private String cmdArg;
-
- public static void main(String[] args) throws Exception {
- if (args.length < 1)
- throw new Exception("missing pathname to application properties file");
-
- try {
- int i = 0;
- DeviceCommsApp application = new DeviceCommsApp(args[i++]);
- String mode = args[i++];
- if (!("watch".equals(mode) || "send".equals(mode))) {
- throw new IllegalArgumentException("Unsupport mode: "+application.mode);
- }
- application.mode = mode;
- if (application.mode.equals("send")) {
- application.cmdLabel = args[i++];
- application.cmdArg = args[i++];
- }
-
- application.run();
- }
- catch (IllegalArgumentException | IndexOutOfBoundsException e) {
- throw new IllegalArgumentException(e.getMessage()
- +"\n"+usage);
- }
- }
-
- /**
- * Create an application instance.
- * @param propsPath pathname to an application configuration file
- * @throws Exception
- */
- DeviceCommsApp(String propsPath) throws Exception {
- super(propsPath);
- }
-
- @Override
- protected void buildTopology(Topology t) {
- mqttDevice().getMqttConfig().setClientId(null);
- MqttStreams mqtt = new MqttStreams(t, () -> mqttDevice().getMqttConfig());
- if (mode.equals("send")) {
- String topic = mqttDevice().commandTopic(cmdLabel);
- JsonObject jo = new JsonObject();
- jo.addProperty("value", cmdArg);
- System.out.println("Publishing command: topic="+topic+" value="+jo);
- TStream<String> cmd = t.strings(JsonFunctions.asString().apply(jo));
- mqtt.publish(cmd, topic, QoS.FIRE_AND_FORGET, false/*retain*/);
- // Hmm... the paho MQTT *non-daemon* threads prevent the app
- // from exiting after returning from main() following job submit().
- // Lacking MqttStreams.shutdown() or such...
- // Delay a bit and then explicitly exit(). Ugh.
- cmd.sink(tuple -> {
- try {
- Thread.sleep(3*1000);
- } catch (Exception e) { }
- System.exit(0); });
- }
- else if (mode.equals("watch")) {
- String topicFilter = mqttDevice().eventTopic(null);
- System.out.println("Watching topic filter "+topicFilter);
- TStream<String> events = mqtt.subscribe(topicFilter, QoS.FIRE_AND_FORGET,
- (topic,payload) -> {
- String s = "\n# topic "+topic;
- s += "\n" + new String(payload);
- return s;
- });
- events.print();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/mqtt/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/mqtt/package-info.java b/samples/apps/src/main/java/edgent/samples/apps/mqtt/package-info.java
deleted file mode 100644
index 66eb514..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/mqtt/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Base support for Edgent MQTT based application samples.
- * <p>
- * This package builds on {@code edgent.samples.apps} providing
- * additional common capabilities in the area of MQTT based device appliations.
- */
-package edgent.samples.apps.mqtt;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/package-info.java b/samples/apps/src/main/java/edgent/samples/apps/package-info.java
deleted file mode 100644
index 102006b..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/package-info.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Support for some more complex Edgent application samples.
- * <p>
- * This package provides some commonly needed capabilities particularly in
- * the area of an external configuration description influence on various
- * things.
- * <p>
- * Focal areas:
- * <ul>
- * <li>{@link edgent.samples.apps.AbstractApplication} - a base class for
- * Edgent applications providing commonly needed features.
- * </li>
- * <li>{@link edgent.samples.apps.TopologyProviderFactory} - a configuration
- * driven factory for an Edgent topology provider.
- * </li>
- * <li>{@link edgent.samples.apps.ApplicationUtilities} - some
- * general configuration driven utilities.
- * </li>
- * <li>{@link edgent.samples.apps.JsonTuples} - utilities for wrapping
- * sensor samples in a JsonObject and operating on it.
- * </li>
- * </ul>
- */
-package edgent.samples.apps;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/Sensor1.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/Sensor1.java b/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/Sensor1.java
deleted file mode 100644
index 78fefa9..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/Sensor1.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.apps.sensorAnalytics;
-
-import static edgent.analytics.math3.stat.Statistic.MAX;
-import static edgent.analytics.math3.stat.Statistic.MEAN;
-import static edgent.analytics.math3.stat.Statistic.MIN;
-import static edgent.analytics.math3.stat.Statistic.STDDEV;
-import static edgent.samples.apps.JsonTuples.KEY_ID;
-import static edgent.samples.apps.JsonTuples.KEY_READING;
-import static edgent.samples.apps.JsonTuples.KEY_TS;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.math3.util.Pair;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-
-import edgent.analytics.sensors.Range;
-import edgent.analytics.sensors.Ranges;
-import edgent.connectors.iot.QoS;
-import edgent.function.Supplier;
-import edgent.samples.apps.JsonTuples;
-import edgent.samples.utils.sensor.PeriodicRandomSensor;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-
-/**
- * Analytics for "Sensor1".
- * <p>
- * This sample demonstrates some common continuous sensor analytic themes.
- * <p>
- * In this case we have a simulated sensor producing 1000 samples per second
- * of an integer type in the range of 0-255.
- * <p>
- * The processing pipeline created is roughly:
- * <ul>
- * <li>Batched Data Reduction - reduce the sensor's 1000 samples per second
- * down to 1 sample per second simple statistical aggregation of the readings.
- * </li>
- * <li>Compute historical information - each 1hz sample is augmented
- * with a 30 second trailing average of the 1hz readings.
- * </li>
- * <li>Threshold detection - each 1hz sample's value is compared
- * against a target range and outliers are identified.
- * </li>
- * <li>Local logging - outliers are logged to a local file
- * </li>
- * <li>Publishing results to a MQTT broker:
- * <ul>
- * <li>when enabled, invdividual outliers are published.</li>
- * <li>Every 30 seconds a list of the last 10 outliers is published.</li>
- * </ul>
- * </li>
- * </ul>
- * <p>
- * The sample also demonstrates:
- * <ul>
- * <li>Dynamic configuration control - subscribe to a MQTT broker
- * to receive commands to adjust the threshold detection range value.
- * </li>
- * <li>Generally, the configuration of the processing is driven via an
- * external configuration description.
- * </li>
- * <li>Conditional stream tracing - configuration controlled inclusion of tracing.
- * </li>
- * <li>Use of {@link TStream#tag(String...)} to improve information provided by
- * the Edgent DevelopmentProvider console.</li>
- * </ul>
- */
-public class Sensor1 {
- private final SensorAnalyticsApplication app;
- private final Topology t;
- private final String sensorId = "sensor1";
-
- public Sensor1(Topology t, SensorAnalyticsApplication app) {
- this.t = t;
- this.app = app;
- }
-
- /**
- * Add the sensor's analytics to the topology.
- */
- public void addAnalytics() {
-
- // Need synchronization for set/get of dynamically changeable values.
- AtomicReference<Range<Integer>> range = new AtomicReference<>();
- AtomicReference<Boolean> isPublish1hzOutsideRange = new AtomicReference<>();
-
- // Initialize the controls
- range.set(app.utils().getRangeInteger(sensorId, "outside1hzMeanRange"));
- isPublish1hzOutsideRange.set(false);
-
- // Handle the sensor's device commands
- app.mqttDevice().commands(commandId("set1hzMeanRangeThreshold"))
- .tag(commandId("set1hzMeanRangeThresholdCmd"))
- .sink(jo -> {
- Range<Integer> newRange = Ranges.valueOfInteger(getCommandValue(jo));
- System.out.println("===== Changing range to "+newRange+" ======");
- range.set(newRange);
- });
- app.mqttDevice().commands(commandId("setPublish1hzOutsideRange"))
- .tag(commandId("setPublish1hzOutsideRangeCmd"))
- .sink(jo -> {
- Boolean b = new Boolean(getCommandValue(jo));
- System.out.println("===== Changing isPublish1hzOutsideRange to "+b+" ======");
- isPublish1hzOutsideRange.set(b);
- });
-
- // Create a raw simulated sensor stream of 1000 tuples/sec.
- // Each tuple is Pair<Long timestampMsec, sensor-reading (0..255)>.
- PeriodicRandomSensor simulatedSensorFactory = new PeriodicRandomSensor();
- TStream<Pair<Long,Integer>> raw1khz =
- simulatedSensorFactory.newInteger(t, 1/*periodMsec*/, 255)
- .tag("raw1khz");
- traceStream(raw1khz, "raw1khz");
-
- // Wrap the raw sensor reading in a JsonObject for convenience.
- TStream<JsonObject> j1khz = JsonTuples.wrap(raw1khz, sensorId)
- .tag("j1khz");
- traceStream(j1khz, "j1khz");
-
- // Data-reduction: reduce 1khz samples down to
- // 1hz aggregate statistics samples.
- TStream<JsonObject> j1hzStats = j1khz.last(1000, JsonTuples.keyFn())
- .batch(JsonTuples.statistics(MIN, MAX, MEAN, STDDEV))
- .tag("1hzStats");
-
- // Create a 30 second sliding window of average trailing Mean values
- // and enrich samples with that information.
- j1hzStats = j1hzStats.last(30, JsonTuples.keyFn()).aggregate(
- (samples, key) -> {
- // enrich and return the most recently added tuple
- JsonObject jo = samples.get(samples.size()-1);
- double meanSum = 0;
- for (JsonObject js : samples) {
- meanSum += JsonTuples.getStatistic(js, MEAN).getAsDouble();
- }
- jo.addProperty("AvgTrailingMean", Math.round(meanSum / samples.size()));
- jo.addProperty("AvgTrailingMeanCnt", samples.size());
- return jo;
- })
- .tag("1hzStats.enriched");
- traceStream(j1hzStats, "j1hzStats");
-
- // Detect 1hz samples whose MEAN value are
- // outside the configuration specified range.
- TStream<JsonObject> outside1hzMeanRange = j1hzStats.filter(
- sample -> {
- int value = JsonTuples.getStatistic(sample, MEAN).getAsInt();
- return !range.get().contains(value);
- })
- .tag("outside1hzMeanRange");
- traceStream(outside1hzMeanRange, () -> "outside1hzMeanRange"+range.get());
-
- // Log every outside1hzMeanRange event
- app.utils().logStream(outside1hzMeanRange, "ALERT", "outside1hzMeanRange");
-
- // Conditionally publish every outside1hzMeanRange event.
- // Use a pressureReliever to prevent backpressure if the broker
- // can't be contacted.
- // TODO enhance MqttDevice with configurable reliever.
- app.mqttDevice().events(
- PlumbingStreams.pressureReliever(
- outside1hzMeanRange.filter(tuple -> isPublish1hzOutsideRange.get())
- .tag("outside1hzMeanRangeEvent.conditional"),
- tuple -> 0, 30).tag("outside1hzMeanRangeEvent.pressureRelieved"),
- app.sensorEventId(sensorId, "outside1hzMeanRangeEvent"), QoS.FIRE_AND_FORGET);
-
- // Demonstrate periodic publishing of a sliding window if
- // something changed since it was last published.
- periodicallyPublishLastNInfo(outside1hzMeanRange, 10, 30,
- "periodicLastOutsideRangeEvent");
-
- // TODO histogram: #alerts over the last 8hr
-
- }
-
- /**
- * Periodically publish the lastN on a stream.
- * @param stream tuples to
- * @param count sliding window size "lastN"
- * @param nSec publish frequency
- * @param event sensor's publish event label
- */
- private void periodicallyPublishLastNInfo(TStream<JsonObject> stream,
- int count, int nSec, String event) {
-
- // Demonstrate periodic publishing of a sliding window if
- // something changed since it was last published.
-
- // Maintain a sliding window of the last N tuples.
- // TODO today, windows don't provide "anytime" access to their collection
- // so maintain our own current copy of the collection that we can
- // access it when needed.
- //
- List<JsonObject> lastN = Collections.synchronizedList(new ArrayList<>());
- stream.last(count, JsonTuples.keyFn())
- .aggregate((samples, key) -> samples)
- .tag(event+".lastN")
- .sink(samples -> {
- // Capture the new list/window.
- synchronized(lastN) {
- lastN.clear();
- lastN.addAll(samples);
- }
- });
-
- // Publish the lastN (with trimmed down info) every nSec seconds
- // if anything changed since the last publish.
- TStream<JsonObject> periodicLastN =
- t.poll(() -> 1, nSec, TimeUnit.SECONDS).tag(event+".trigger")
- .filter(trigger -> !lastN.isEmpty()).tag(event+".changed")
- .map(trigger -> {
- synchronized(lastN) {
- // create a single JsonObject with the list
- // of reduced-content samples
- JsonObject jo = new JsonObject();
- jo.addProperty(KEY_ID, sensorId);
- jo.addProperty(KEY_TS, System.currentTimeMillis());
- jo.addProperty("window", count);
- jo.addProperty("pubFreqSec", nSec);
- JsonArray ja = new JsonArray();
- jo.add("lastN", ja);
- for (JsonObject j : lastN) {
- JsonObject jo2 = new JsonObject();
- ja.add(jo2);
- jo2.add(KEY_TS, j.get(KEY_TS));
- // reduce size: include only 2 significant digits
- jo2.addProperty(KEY_READING, String.format("%.2f",
- JsonTuples.getStatistic(j, MEAN).getAsDouble()));
- }
- lastN.clear();
- return jo;
- }
- })
- .tag(event);
-
- traceStream(periodicLastN, event);
-
- // Use a pressureReliever to prevent backpressure if the broker
- // can't be contacted.
- // TODO enhance MqttDevice with configurable reliever.
- app.mqttDevice().events(
- PlumbingStreams.pressureReliever(periodicLastN, tuple -> 0, 30)
- .tag(event+".pressureRelieved"),
- app.sensorEventId(sensorId, event), QoS.FIRE_AND_FORGET);
- }
-
- private String commandId(String commandId) {
- return app.commandId(sensorId, commandId);
- }
-
- private String getCommandValue(JsonObject jo) {
- return app.getCommandValueString(jo);
- }
-
- private <T> TStream<T> traceStream(TStream<T> stream, String label) {
- return traceStream(stream, () -> label);
- }
-
- private <T> TStream<T> traceStream(TStream<T> stream, Supplier<String> label) {
- return app.utils().traceStream(stream, sensorId, label);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java b/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java
deleted file mode 100644
index 59f60e6..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.apps.sensorAnalytics;
-
-import edgent.samples.apps.mqtt.AbstractMqttApplication;
-import edgent.topology.Topology;
-
-/**
- * A sample application demonstrating some common sensor analytic processing
- * themes.
- */
-public class SensorAnalyticsApplication extends AbstractMqttApplication {
-
- public static void main(String[] args) throws Exception {
- if (args.length != 1)
- throw new Exception("missing pathname to application properties file");
-
- SensorAnalyticsApplication application = new SensorAnalyticsApplication(args[0]);
-
- application.run();
- }
-
- /**
- * Create an application instance.
- * @param propsPath pathname to an application configuration file
- * @throws Exception
- */
- SensorAnalyticsApplication(String propsPath) throws Exception {
- super(propsPath);
- }
-
- @Override
- protected void buildTopology(Topology t) {
-
- // Add the "sensor1" analytics to the topology
- new Sensor1(t, this).addAnalytics();
-
- // TODO Add the "sensor2" analytics to the topology
- // TODO Add the "sensor3" analytics to the topology
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/package-info.java b/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/package-info.java
deleted file mode 100644
index a6bfa60..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/package-info.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * The Sensor Analytics sample application demonstrates some common
- * continuous sensor analytic application themes.
- * See {@link edgent.samples.apps.sensorAnalytics.Sensor1 Sensor1} for the
- * core of the analytics processing and
- * {@link edgent.samples.apps.sensorAnalytics.SensorAnalyticsApplication
- * SensorAnalyticsApplication}
- * for the main program.
- * <p>
- * The themes include:
- * <ul>
- * <li>Batched Data Reduction - reducing higher frequency sensor reading
- * samples down to a lower frequency using statistical aggregations
- * of the raw readings.
- * </li>
- * <li>Computing continuous historical statistics such as a
- * 30 second trailing average of sensor readings.
- * </li>
- * <li>Outlier / threshold detection against a configurable range</li>
- * <li>Local logging of stream tuples</li>
- * <li>Publishing analytic results to an MQTT broker</li>
- * <li>Dynamic configuration control - subscribing to a MQTT broker
- * to receive commands to adjust the threshold detection range value.
- * </li>
- * <li>Generally, the configuration of the processing is driven via an
- * external configuration description.
- * </li>
- * <li>Conditional stream tracing - configuration controlled inclusion of tracing.
- * </li>
- * </ul>
- *
- * <h2>Prerequisites:</h2>
- * <p>
- * The default configuration is for a local MQTT broker.
- * A good resource is <a href="http://mosquitto.org">mosquitto.org</a>
- * if you want to download and setup your own MQTT broker.
- * Or you can use some other broker available in your environment.
- * <p>
- * Alternatively, there are some public MQTT brokers available to experiment with.
- * Their availability status isn't guaranteed. If you're unable to connect
- * to the broker, it's likely that it isn't up or your firewalls don't
- * allow you to connect. DO NOT PUBLISH ANYTHING SENSITIVE - anyone
- * can be listing. A couple of public broker locations are noted
- * in the application's properties file.
- * <p>
- * The default {@code mqttDevice.topic.prefix} value, used by default in
- * generated MQTT topic values and MQTT clientId, contains the user's
- * local login id. The SensorAnalytics sample application does not have any
- * other sensitive information.
- * <p>
- * Edit {@code <edgent-release>/java8/scripts/apps/sensorAnalytics/sensoranalytics.properties}
- * to change the broker location or topic prefix.
- *
- * <h2>Application output:</h2>
- * <p>
- * The application periodically (every 30sec), publishes a list of
- * the last 10 outliers to MQTT. When enabled, it also publishes
- * full details of individual outliers as they occur.
- * It also subscribes to MQTT topics for commands to dynamically change the
- * threshold range and whether to publish individual outliers.
- * <p>
- * All MQTT configuration information, including topic patterns,
- * are in the application.properties file.
- * <p>
- * The application logs outlier events in local files. The actual location
- * is specified in the application.properties file.
- * <p>
- * The application generates some output on stdout and stderr.
- * The information includes:
- * <ul>
- * <li>MQTT device info. Lines 1 through 5 in the sample console output below.</li>
- * <li>URL for the Edgent development console. Line 6.</li>
- * <li>Trace of the outlier event stream. Line 7.
- * The output is a label, which includes the active threshold range,
- * followed by the event's JSON.
- * These are the events that will also be logged and conditionally published
- * as well as included in the periodic lastN info published every 30sec.
- * </li>
- * <li>Announcement when a "change threshold" or "enable publish of 1khz outliers"
- * command is received and processed.
- * Line 8 and 9.
- * </li>
- * <li>At this time some INFO trace output from the MQTT connector</li>
- * <li>At this time some INFO trace output from the File connector</li>
- * </ul>
- * Sample console output:
- * <pre>{@code
- * [1] MqttDevice serverURLs [tcp://localhost:1883]
- * [2] MqttDevice clientId id/012345
- * [3] MqttDevice deviceId 012345
- * [4] MqttDevice event topic pattern id/012345/evt/+/fmt/json
- * [5] MqttDevice command topic pattern id/012345/cmd/+/fmt/json
- * [6] Edgent Console URL for the job: http://localhost:57324/console
- * [7] sensor1.outside1hzMeanRange[124..129]: {"id":"sensor1","reading":{"N":1000,"MIN":0.0,"MAX":254.0,"MEAN":130.23200000000006,"STDDEV":75.5535473324351},"msec":1454623874408,"agg.begin.msec":1454623873410,"agg.count":1000,"AvgTrailingMean":128,"AvgTrailingMeanCnt":4}
- * ...
- * [8] ===== Changing range to [125..127] ======
- * sensor1.outside1hzMeanRange[125..127]: {"id":"sensor1","reading":{"N":1000,"MIN":0.0,"MAX":254.0,"MEAN":129.00099999999978,"STDDEV":74.3076080870567},"msec":1454624142419,"agg.begin.msec":1454624141420,"agg.count":1000,"AvgTrailingMean":127,"AvgTrailingMeanCnt":30}
- * [9] ===== Changing isPublish1hzOutsideRange to true ======
- * ...
- * }</pre>
- *
- * <h2>Running, observing and controlling the application:</h2>
- * <pre>{@code
- * $ ./runSensorAnalytics.sh
- * }</pre>
- * <p>
- * To observe the locally logged outlier events:
- * <pre>{@code
- * $ tail -f /tmp/SensorAnalytics/logs/.outside1hzMeanRange
- * }</pre>
- * <p>
- * To observe the events that are getting published to MQTT:
- * <pre>{@code
- * $ ./runDeviceComms.sh watch
- * }</pre>
- * <p>
- * To change the outlier threshold setting:
- * <br>The command value is the new range string: {@code [<lowerBound>..<upperBound>]}.
- * <pre>{@code
- * $ ./runDeviceComms.sh send sensor1.set1hzMeanRangeThreshold "[125..127]"
- * }</pre>
- * <p>
- * To change the "publish individual 1hz outliers" control:
- * <pre>{@code
- * $ ./runDeviceComms.sh send sensor1.setPublish1hzOutsideRange true
- * }</pre>
- *
- * <h3>Alternative MQTT clients</h3>
- * You can use any MQTT client but you will have to specify the
- * MQTT server, the event topics to watch / subscribe to, and the command topics
- * and JSON for publish commands. The MqttDevice output above provides most
- * of the necessary information.
- * <p>
- * For example, the {@code mosquitto_pub} and
- * {@code mosquitto_sub} commands equivalent to the above runDeviceComms.sh
- * commands are:
- * <pre>{@code
- * # Watch the device's event topics
- * $ /usr/local/bin/mosquitto_sub -t id/012345/evt/+/fmt/json
- * # change the outlier threshold setting
- * $ /usr/local/bin/mosquitto_pub -m '{"value":"[125..127]"}' -t id/012345/cmd/sensor1.set1hzMeanRangeThreshold/fmt/json
- * # change the "publish individual 1hz outliers" control
- * $ /usr/local/bin/mosquitto_pub -m '{"value":"true"}' -t id/012345/cmd/sensor1.setPublish1hzOutsideRange/fmt/json
- * }</pre>
- */
-package edgent.samples.apps.sensorAnalytics;
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/AbstractApplication.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/AbstractApplication.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/AbstractApplication.java
new file mode 100644
index 0000000..10099dd
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/AbstractApplication.java
@@ -0,0 +1,130 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Properties;
+
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.samples.apps.mqtt.AbstractMqttApplication;
+import org.apache.edgent.topology.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Application base class.
+ * <p>
+ * Application instances need to:
+ * <ul>
+ * <li>define an implementation for {@link #buildTopology(Topology)}</li>
+ * <li>call {@link #run()} to build and submit the topology for execution.</li>
+ * </ul>
+ * <p>
+ * The class provides some common processing needs:
+ * <ul>
+ * <li>Support for an external configuration file</li>
+ * <li>Provides a {@link TopologyProviderFactory}</li>
+ * <li>Provides a {@link ApplicationUtilities}</li>
+ * </ul>
+ * @see AbstractMqttApplication
+ */
+public abstract class AbstractApplication {
+
+ protected final String propsPath;
+ protected final Properties props;
+ private final ApplicationUtilities applicationUtilities;
+ private static final Logger logger = LoggerFactory.getLogger(AbstractApplication.class);
+
+ protected Topology t;
+
+ public AbstractApplication(String propsPath) throws Exception {
+ this.propsPath = propsPath;
+ props = new Properties();
+ props.load(Files.newBufferedReader(new File(propsPath).toPath()));
+ applicationUtilities = new ApplicationUtilities(props);
+ }
+
+ /**
+ * Construct and run the application's topology.
+ * @throws Exception on failure
+ */
+ protected void run() throws Exception {
+// TODO need to setup logging to squelch stderr output from the runtime/connectors,
+// including paho output
+
+ TopologyProviderFactory tpFactory = new TopologyProviderFactory(props);
+
+ DirectProvider tp = tpFactory.newProvider();
+
+ // Create a topology for the application
+ t = tp.newTopology(config().getProperty("application.name"));
+
+ preBuildTopology(t);
+
+ buildTopology(t);
+
+ // Run the topology
+ HttpServer httpServer = tp.getServices().getService(HttpServer.class);
+ if (httpServer != null) {
+ System.out.println("Edgent Console URL for the job: "
+ + httpServer.getConsoleUrl());
+ }
+ tp.submit(t);
+ }
+
+ /**
+ * Get the application's raw configuration information.
+ * @return the configuration
+ */
+ public Properties config() {
+ return props;
+ }
+
+ /**
+ * Get the application's
+ * @return the helper
+ */
+ public ApplicationUtilities utils() {
+ return applicationUtilities;
+ }
+
+ /**
+ * A hook for a subclass to do things prior to the invocation
+ * of {@link #buildTopology(Topology)}.
+ * <p>
+ * The default implementation is a no-op.
+ * @param t the application's topology
+ */
+ protected void preBuildTopology(Topology t) {
+ return;
+ }
+
+ /**
+ * Build the application's topology.
+ * @param t Topology to add to
+ */
+ abstract protected void buildTopology(Topology t);
+
+ public void handleRuntimeError(String msg, Exception e) {
+ logger.error("A runtime error occurred", e);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/ApplicationUtilities.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/ApplicationUtilities.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/ApplicationUtilities.java
new file mode 100644
index 0000000..b8a0cee
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/ApplicationUtilities.java
@@ -0,0 +1,255 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps;
+
+import java.io.File;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.edgent.analytics.sensors.Range;
+import org.apache.edgent.analytics.sensors.Ranges;
+import org.apache.edgent.connectors.file.FileStreams;
+import org.apache.edgent.connectors.file.FileWriterCycleConfig;
+import org.apache.edgent.connectors.file.FileWriterFlushConfig;
+import org.apache.edgent.connectors.file.FileWriterPolicy;
+import org.apache.edgent.connectors.file.FileWriterRetentionConfig;
+import org.apache.edgent.function.Predicate;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TStream;
+
+/**
+ * Some general purpose application configuration driven utilities.
+ * <p>
+ * Utilities include:
+ * <ul>
+ * <li>Get a property name for a sensor configuration item</li>
+ * <li>Get a Range value for a sensor range item</li>
+ * <li>Log a stream</li>
+ * <li>Conditionally trace a stream</li>
+ * </ul>
+ */
+public class ApplicationUtilities {
+
+ private final Properties props;
+
+ public ApplicationUtilities(Properties props) {
+ this.props = props;
+ }
+
+ private Properties config() {
+ return props;
+ }
+
+ /**
+ * Trace a stream to System.out if the sensor id's "label" has been configured
+ * to enable tracing.
+ * <p>
+ * If tracing has not been enabled in the config, the topology will not
+ * be augmented to trace the stream.
+ *
+ * @param <T> Tuple type
+ * @param stream the stream to trace
+ * @param sensorId the sensor id
+ * @param label some unique label
+ * @return the input stream
+ */
+ public <T> TStream<T> traceStream(TStream<T> stream, String sensorId, Supplier<String> label) {
+ return traceStream(stream, () -> sensorId+"."+label.get());
+ }
+
+ /**
+ * Trace a stream to System.out if the "label" has been configured
+ * to enable tracing.
+ * <p>
+ * If tracing has not been enabled in the config, the topology will not
+ * be augmented to trace the stream.
+ *
+ * @param <T> Tuple type
+ * @param stream the stream to trace
+ * @param label some unique label
+ * @return the input stream
+ */
+ public <T> TStream<T> traceStream(TStream<T> stream, Supplier<String> label) {
+ if (includeTraceStreamOps(label.get())) {
+ TStream<?> s = stream.filter(traceTuplesFn(label.get())).tag(label.get()+".trace");
+ s.peek(sample -> System.out.println(String.format("%s: %s", label.get(), sample.toString())));
+ }
+ return stream;
+ }
+
+ private boolean includeTraceStreamOps(String label) {
+ String includesCsv = config().getProperty("stream.tracing.includes.csv", "");
+ String includesRegex = config().getProperty("stream.tracing.includes.regex", "");
+ String excludesCsv = config().getProperty("stream.tracing.excludes.csv", "");
+ String excludesRegex = config().getProperty("stream.tracing.excludes.regex", "");
+
+ Set<String> includesSet = new HashSet<>();
+ for (String s : includesCsv.split(","))
+ includesSet.add(s.trim());
+ Set<String> excludesSet = new HashSet<>();
+ for (String s : excludesCsv.split(","))
+ excludesSet.add(s.trim());
+
+ boolean isIncluded = false;
+ if (includesSet.contains(label) || label.matches(includesRegex))
+ isIncluded = true;
+ if (excludesSet.contains(label) || label.matches(excludesRegex))
+ isIncluded = false;
+
+ return isIncluded;
+ }
+
+ private <T> Predicate<T> traceTuplesFn(String label) {
+ return tuple -> true; // TODO make dynamic config; affected by "label" value
+ // check label for match against csv or regex from props
+ }
+
+ /**
+ * Get the property name for a sensor's configuration item.
+ * @param sensorId the sensor's id
+ * @param label the label for an instance of "kind" (e.g., "tempThreshold")
+ * @param kind the kind of configuration item (e.g., "range")
+ * @return the configuration property name
+ */
+ public String getSensorPropertyName(String sensorId, String label, String kind) {
+ String name = kind + "." + label; // kind.label
+ if (sensorId!=null && !sensorId.isEmpty())
+ name = sensorId + "." + name; // sensorId.kind.label
+ return name;
+ }
+
+ private String getSensorConfigValue(String sensorId, String label, String kind) {
+ String name = getSensorPropertyName(sensorId, label, kind);
+ String val = config().getProperty(name);
+ if (val==null)
+ throw new IllegalArgumentException("Missing configuration property "+name);
+ return val;
+ }
+
+ /**
+ * Get the Range for a sensor range configuration item.
+ * @param sensorId the sensor's id
+ * @param label the range's label
+ * @return the Range
+ */
+ public Range<Integer> getRangeInteger(String sensorId, String label) {
+ String val = getSensorConfigValue(sensorId, label, "range");
+ return Ranges.valueOfInteger(val);
+ }
+
+ /**
+ * Get the Range for a sensor range configuration item.
+ * @param sensorId the sensor's id
+ * @param label the range's label
+ * @return the Range
+ */
+ public Range<Byte> getRangeByte(String sensorId, String label) {
+ String val = getSensorConfigValue(sensorId, label, "range");
+ return Ranges.valueOfByte(val);
+ }
+
+ /**
+ * Get the Range for a sensor range configuration item.
+ * @param sensorId the sensor's id
+ * @param label the range's label
+ * @return the Range
+ */
+ public Range<Short> getRangeShort(String sensorId, String label) {
+ String val = getSensorConfigValue(sensorId, label, "range");
+ return Ranges.valueOfShort(val);
+ }
+
+ /**
+ * Get the Range for a sensor range configuration item.
+ * @param sensorId the sensor's id
+ * @param label the range's label
+ * @return the Range
+ */
+ public Range<Float> getRangeFloat(String sensorId, String label) {
+ String val = getSensorConfigValue(sensorId, label, "range");
+ return Ranges.valueOfFloat(val);
+ }
+
+ /**
+ * Get the Range for a sensor range configuration item.
+ * @param sensorId the sensor's id
+ * @param label the range's label
+ * @return the Range
+ */
+ public Range<Double> getRangeDouble(String sensorId, String label) {
+ String val = getSensorConfigValue(sensorId, label, "range");
+ return Ranges.valueOfDouble(val);
+ }
+
+ /**
+ * Log every tuple on the stream using the {@code FileStreams} connector.
+ * <p>
+ * The logs are added to the directory as specified
+ * by the "application.log.dir" property.
+ * The directory will be created as needed.
+ * <p>
+ * The "active" (open / being written) log file name is {@code .<baseName>}.
+ * <br>
+ * Completed stable logs have a name of {@code <baseName>_YYYYMMDD_HHMMSS}.
+ * <p>
+ * The log entry format being used is:
+ * {@code [<date>] [<eventTag>] <tuple>.toString()}
+ * <p>
+ * See {@link FileStreams#textFileWriter(TStream, org.apache.edgent.function.Supplier, org.apache.edgent.function.Supplier)}
+ *
+ * @param <T> Tuple type
+ * @param stream the TStream
+ * @param baseName the base log name
+ * @param eventTag a tag that gets added to the log entry
+ * @return the input stream
+ */
+ public <T> TStream<T> logStream(TStream<T> stream, String eventTag, String baseName) {
+ // Define the writer policy.
+ // TODO could make the policy configurable via config()
+ FileWriterPolicy<String> policy = new FileWriterPolicy<String>(
+ FileWriterFlushConfig.newTimeBasedConfig(2_000/*msec*/), // flush every 2sec
+ FileWriterCycleConfig.newFileSizeBasedConfig(10_000), // new file every 10KB
+ FileWriterRetentionConfig.newFileCountBasedConfig(1) // retain 1 file
+ );
+
+ // Compose the base file pathname
+ File dir = new File(config().getProperty("application.log.dir"));
+ String basePathname = new File(dir, baseName).toString();
+
+ // Transform the stream to a TStream<String> of string log entry values
+ TStream<String> stringEntries = stream.map(sample -> String.format("[%s] [%s] %s", new Date().toString(), eventTag, sample.toString()))
+ .tag(baseName+".log");
+
+ // Use the FileStreams connector to write the logs.
+ //
+ // A hack for getting the log directories created at runtime
+ // TODO add another policy thing... or simply make textFileWriter do it?
+ //
+ FileStreams.textFileWriter(stringEntries,
+ () -> { if (!dir.exists()) dir.mkdirs();
+ return basePathname;
+ },
+ () -> policy);
+
+ return stream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/JsonTuples.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/JsonTuples.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/JsonTuples.java
new file mode 100644
index 0000000..0c231c8
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/JsonTuples.java
@@ -0,0 +1,196 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps;
+
+import static org.apache.edgent.analytics.math3.stat.Statistic.MAX;
+import static org.apache.edgent.analytics.math3.stat.Statistic.MEAN;
+import static org.apache.edgent.analytics.math3.stat.Statistic.MIN;
+import static org.apache.edgent.analytics.math3.stat.Statistic.STDDEV;
+
+import java.util.List;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.edgent.analytics.math3.json.JsonAnalytics;
+import org.apache.edgent.analytics.math3.stat.Statistic;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.topology.TStream;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * Utilties to ease working working with sensor "samples" by wrapping them
+ * in JsonObjects.
+ * <p>
+ * The Json Tuple sensor "samples" have a standard collection of properties.
+ */
+public class JsonTuples {
+
+ /*
+ * Common attributes in the JsonObject
+ */
+ public static final String KEY_ID = "id";
+ public static final String KEY_TS = "msec";
+ public static final String KEY_READING = "reading";
+ public static final String KEY_AGG_BEGIN_TS = "agg.begin.msec";
+ public static final String KEY_AGG_COUNT = "agg.count";
+
+ /**
+ * Create a JsonObject wrapping a raw {@code Pair<Long msec,T reading>>} sample.
+ * @param <T> Tuple type
+ * @param sample the raw sample
+ * @param id the sensor's Id
+ * @return the wrapped sample
+ */
+ public static <T> JsonObject wrap(Pair<Long,T> sample, String id) {
+ JsonObject jo = new JsonObject();
+ jo.addProperty(KEY_ID, id);
+ jo.addProperty(KEY_TS, sample.getFirst());
+ T value = sample.getSecond();
+ if (value instanceof Number)
+ jo.addProperty(KEY_READING, (Number)sample.getSecond());
+ else if (value instanceof String)
+ jo.addProperty(KEY_READING, (String)sample.getSecond());
+ else if (value instanceof Boolean)
+ jo.addProperty(KEY_READING, (Boolean)sample.getSecond());
+// else if (value instanceof array) {
+// // TODO cvt to JsonArray
+// }
+// else if (value instanceof Object) {
+// // TODO cvt to JsonObject
+// }
+ else {
+ Class<?> clazz = value != null ? value.getClass() : Object.class;
+ throw new IllegalArgumentException("Unhandled value type: "+ clazz);
+ }
+ return jo;
+ }
+
+ /**
+ * Create a stream of JsonObject wrapping a stream of
+ * raw {@code Pair<Long msec,T reading>>} samples.
+ *
+ * @param <T> Tuple type
+ * @param stream the raw input stream
+ * @param id the sensor's Id
+ * @return the wrapped stream
+ */
+ public static <T> TStream<JsonObject> wrap(TStream<Pair<Long,T>> stream, String id) {
+ return stream.map(pair -> wrap(pair, id));
+ }
+
+ /**
+ * The partition key function for wrapped sensor samples.
+ * <p>
+ * The {@code KEY_ID} property is returned for the key.
+ * @return the function
+ */
+ public static Function<JsonObject,String> keyFn() {
+ return sample -> sample.get(KEY_ID).getAsString();
+ }
+
+
+ /**
+ * Get a statistic value from a sample.
+ * <p>
+ * Same as {@code getStatistic(jo, JsonTuples.KEY_READING, stat)}.
+ *
+ * @param jo the sample
+ * @param stat the Statistic of interest
+ * @return the JsonElement for the Statistic
+ * @throws RuntimeException of the stat isn't present
+ */
+ public static JsonElement getStatistic(JsonObject jo, Statistic stat) {
+ return getStatistic(jo, JsonTuples.KEY_READING, stat);
+ }
+
+ /**
+ * Get a statistic value from a sample.
+ * <p>
+ * Convenience for working with samples containing a property
+ * whose value is one or more {@link Statistic}
+ * as created by
+ * {@link JsonAnalytics#aggregate(org.apache.edgent.topology.TWindow, String, String, org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate...) JsonAnalytics.aggregate()}
+ *
+ * @param jo the sample
+ * @param valueKey the name of the property containing the JsonObject of Statistics
+ * @param stat the Statistic of interest
+ * @return the JsonElement for the Statistic
+ * @throws RuntimeException of the stat isn't present
+ */
+ public static JsonElement getStatistic(JsonObject jo, String valueKey, Statistic stat) {
+ JsonObject statsjo = jo.get(valueKey).getAsJsonObject();
+ return statsjo.get(stat.name());
+ }
+
+ /**
+ * Create a function that computes the specified statistics on the list of
+ * samples and returns a new sample containing the result.
+ * <p>
+ * The single tuple contains the specified statistics computed over
+ * all of the {@code JsonTuple.KEY_READING}
+ * values from {@code List<JsonObject>}.
+ * <p>
+ * The resulting sample contains the properties:
+ * <ul>
+ * <li>JsonTuple.KEY_ID</li>
+ * <li>JsonTuple.KEY_MSEC - msecTimestamp of the last sample in the window</li>
+ * <li>JsonTuple.KEY_AGG_BEGIN_MSEC - msecTimestamp of the first sample in the window</li>
+ * <li>JsonTuple.KEY_AGG_COUNT - number of samples in the window ({@code value=factor})</li>
+ * <li>JsonTuple.KEY_READING - a JsonObject of the statistics
+ * as defined by
+ * {@link JsonAnalytics#aggregate(org.apache.edgent.topology.TWindow, String, String, org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate...) JsonAnalytics.aggregate()}
+ * </ul>
+ * <p>
+ * Sample use:
+ * <pre>{@code
+ * TStream<JsonObject> s = ...
+ * // reduce s by a factor of 100 with stats MEAN and STDEV
+ * TStream<JsonObject> reduced = s.batch(100, statistics(Statistic.MEAN, Statistic.STDDEV));
+ * }</pre>
+ *
+ * @param statistics the statistics to calculate over the window
+ * @return {@code TStream<JsonObject>} for the reduced {@code stream}
+ */
+ public static BiFunction<List<JsonObject>,String,JsonObject> statistics(Statistic... statistics) {
+ BiFunction<List<JsonObject>,JsonElement,JsonObject> statsFn =
+ JsonAnalytics.aggregateList(KEY_ID, KEY_READING,
+ j -> j.get(KEY_READING).getAsDouble(),
+ MIN, MAX, MEAN, STDDEV);
+
+ return (samples, key) -> {
+ JsonObject jo = statsFn.apply(samples, samples.get(0).get(KEY_ID));
+ JsonTuples.addAggStdInfo(jo, samples);
+ return jo;
+ };
+ }
+
+ private static void addAggStdInfo(JsonObject jo, List<JsonObject> samples) {
+ // beginMsec, endMsec, nSamples
+ long msec = samples.get(0).get(KEY_TS).getAsLong();
+ long msec2 = samples.get(samples.size()-1).get(KEY_TS).getAsLong();
+ int nSamples = samples.size();
+
+ jo.addProperty(KEY_TS, msec2);
+ jo.addProperty(KEY_AGG_BEGIN_TS, msec);
+ jo.addProperty(KEY_AGG_COUNT, nSamples);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/TopologyProviderFactory.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/TopologyProviderFactory.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/TopologyProviderFactory.java
new file mode 100644
index 0000000..1128fcb
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/TopologyProviderFactory.java
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps;
+
+import java.util.Properties;
+
+import org.apache.edgent.providers.direct.DirectProvider;
+
+/**
+ * A configuration driven factory for an Edgent topology provider.
+ */
+public class TopologyProviderFactory {
+ private final Properties props;
+
+ /**
+ * Construct a factory
+ * @param props configuration information.
+ */
+ public TopologyProviderFactory(Properties props) {
+ this.props = props;
+ }
+
+ /**
+ * Get a new topology provider.
+ * <p>
+ * The default provider is {@code org.apache.edgent.providers.direct.DirectProvider}.
+ * <p>
+ * The {@code topology.provider} configuration property can specify
+ * an alternative.
+ *
+ * @return the provider
+ * @throws Exception if the provider couldn't be created
+ */
+ public DirectProvider newProvider() throws Exception {
+ String name = props.getProperty("topology.provider", "org.apache.edgent.providers.direct.DirectProvider");
+ Class<?> clazz = null;
+ try {
+ clazz = Class.forName(name);
+ }
+ catch (ClassNotFoundException e) {
+ String msg = "Class not found: "+e.getLocalizedMessage();
+ System.err.println(msg);
+ throw new IllegalStateException(msg);
+ }
+ return (DirectProvider) clazz.newInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/applicationTemplate.properties
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/applicationTemplate.properties b/samples/apps/src/main/java/org/apache/edgent/samples/apps/applicationTemplate.properties
new file mode 100644
index 0000000..c45d3db
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/applicationTemplate.properties
@@ -0,0 +1,81 @@
+# A template file for Application Configuration properties
+#
+# The default Edgent topology provider is DirectProvider
+#topology.provider=org.apache.edgent.providers.development.DevelopmentProvider
+#
+application.name=MyAnalytics
+#
+
+# =========================================================================
+# Application stream logging configuration
+# Where the app puts its stream logs.
+# The directory will be created when the topology
+# runs if it doesn't already exist.
+application.log.dir=/tmp/MyAnalytics/logs
+
+# =========================================================================
+# Application "ranges" - e.g., for threshold detections
+# Specify values generated by Range.toString():
+# <lowerBoundType><lowerBound>..<upperBound><upperBoundType>
+# where
+# lowerBoundType is "[" inclusive or "(" exclusive
+# upperBoundType is "]" inclusive or ")" exclusive
+# lowerBound or upperBound is "*" for open ranges,
+# e.g., [*..50] for "atMost" 50
+#
+sensor1.range.outside1hzMeanRange=[124..129]
+
+# =========================================================================
+# MQTT Device and Connector configuration info.
+#
+# MQTT Device -- See org.apache.edgent.connectors.mqtt.device.MqttDevice for all
+# of the properties.
+#
+# An optional topic prefix. It can be used to isolate users or applications
+# in shared MQTT broker configurations. By default it is incorporated
+# into device topics and the MQTT clientId.
+# If you use a public MQTT broker you may want to change the topic
+# prefix so it is still unique for you but doesn't include the
+# user name or application name.
+mqttDevice.topic.prefix=ibm.xyzzy-streams.samples/user/{user.name}/{application.name}/
+#
+# The device id used for identifying the device's events and commands
+# in the MQTT topic namespace.
+# By default it also gets incorporated into the MQTT clientId value.
+mqttDevice.id=012345
+#
+# The MQTT clientId. Only one instance of a MqttDevice can connect
+# to the MQTT broker with a given clientId.
+#mqttDevice.mqtt.clientId={mqttDevice.topic.prefix}id/{mqttDevice.id}
+#
+# MQTT Connector See org.apache.edgent.connectors.mqtt.MqttConfig.fromProperties()
+#
+# The default configuration is for a local MQTT broker.
+# See mosquitto.org for instructions on downloading a MQTT broker.
+# Or use some other MQTT broker available in your environment.
+mqtt.serverURLs=tcp://localhost:1883
+#
+# Alternatively, there are some public MQTT brokers available to experiment with.
+# Their availability status isn't guaranteed. If you're unable to connect
+# to the broker, it's likely that it isn't up or your firewalls don't
+# allow you to connect. DO NOT PUBLISH ANYTHING SENSITIVE - anyone
+# can be listing.
+#mqtt.serverURLs=tcp://iot.eclipse.org:1883
+#mqtt.serverURLs=tcp://test.mosquitto.org:1883
+#
+#mqtt.userName=xyzzy
+#mqtt.password=myMosquittoPw
+
+# =========================================================================
+# Patterns for identifying which streams to trace to System.out
+# To enable use include.csv and/or includes.regex.
+# To exclude an otherwise included file, use excludes.csv and/or excludes.regex
+#
+# Some tracing labels
+# sensor1.raw1khz,sensor1.j1khz,sensor1.j1hzStats,sensor1.outside1hzMeanRange*,
+# sensor1.periodicLastN*
+#
+#stream.tracing.includes.csv=sensor1.j1hzStats
+stream.tracing.includes.regex=sensor1.outside1hzMeanRange.*
+#stream.tracing.excludes.regex=.*
+#stream.tracing.excludes.csv=sensor1.raw1khz
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/AbstractMqttApplication.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/AbstractMqttApplication.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/AbstractMqttApplication.java
new file mode 100644
index 0000000..6bd943a
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/AbstractMqttApplication.java
@@ -0,0 +1,121 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps.mqtt;
+
+import static org.apache.edgent.connectors.iot.IotDevice.CMD_PAYLOAD;
+
+import java.util.Arrays;
+
+import org.apache.edgent.connectors.mqtt.iot.MqttDevice;
+import org.apache.edgent.samples.apps.AbstractApplication;
+import org.apache.edgent.samples.apps.ApplicationUtilities;
+import org.apache.edgent.samples.apps.TopologyProviderFactory;
+import org.apache.edgent.topology.Topology;
+
+import com.google.gson.JsonObject;
+
+/**
+ * An MQTT Application base class.
+ * <p>
+ * Application instances need to:
+ * <ul>
+ * <li>define an implementation for {@link #buildTopology(Topology)}</li>
+ * <li>call {@link #run()} to build and submit the topology for execution.</li>
+ * </ul>
+ * <p>
+ * The class provides some common processing needs:
+ * <ul>
+ * <li>Support for an external configuration file</li>
+ * <li>Provides a {@link TopologyProviderFactory}</li>
+ * <li>Provides a {@link ApplicationUtilities}</li>
+ * <li>Provides a {@link MqttDevice}</li>
+ * </ul>
+ */
+public abstract class AbstractMqttApplication extends AbstractApplication {
+
+ private MqttDevice mqttDevice;
+
+ public AbstractMqttApplication(String propsPath) throws Exception {
+ super(propsPath);
+ }
+
+ @Override
+ protected void preBuildTopology(Topology t) {
+ // Add an MQTT device communication manager to the topology
+ updateTopicPrefix();
+ mqttDevice = new MqttDevice(t, props);
+ System.out.println("MqttDevice serverURLs " + Arrays.toString(mqttDevice.getMqttConfig().getServerURLs()));
+ System.out.println("MqttDevice clientId " + mqttDevice.getMqttConfig().getClientId());
+ System.out.println("MqttDevice deviceId " + props.getProperty("mqttDevice.id"));
+ System.out.println("MqttDevice event topic pattern " + mqttDevice.eventTopic(null));
+ System.out.println("MqttDevice command topic pattern " + mqttDevice.commandTopic(null));
+ }
+
+ /**
+ * Get the application's MqttDevice
+ * @return the MqttDevice
+ */
+ public MqttDevice mqttDevice() {
+ return mqttDevice;
+ }
+
+ private void updateTopicPrefix() {
+ String val = props.getProperty("mqttDevice.topic.prefix");
+ if (val != null) {
+ val = val.replace("{user.name}", System.getProperty("user.name"));
+ val = val.replace("{application.name}", props.getProperty("application.name"));
+ props.setProperty("mqttDevice.topic.prefix", val);
+ }
+ }
+
+ /**
+ * Compose a MqttDevice eventId for the sensor.
+ * @param sensorId the sensor id
+ * @param eventId the sensor's eventId
+ * @return the device eventId
+ */
+ public String sensorEventId(String sensorId, String eventId) {
+ return sensorId + "." + eventId;
+ }
+
+ /**
+ * Compose a MqttDevice commandId for the sensor
+ * @param sensorId the sensor id
+ * @param commandId the sensor's commandId
+ * @return the device commandId
+ */
+ public String commandId(String sensorId, String commandId) {
+ return sensorId + "." + commandId;
+ }
+
+ /**
+ * Extract a simple string valued command arg
+ * from a {@link MqttDevice#commands(String...)} returned
+ * JsonObject tuple.
+ * <p>
+ * Interpret the JsonObject's embedded payload as a JsonObject with a single
+ * "value" property.
+ * @param jo the command tuple.
+ * @return the command's argument value
+ */
+ public String getCommandValueString(JsonObject jo) {
+ return jo.get(CMD_PAYLOAD).getAsJsonObject().get("value").getAsString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/DeviceCommsApp.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/DeviceCommsApp.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/DeviceCommsApp.java
new file mode 100644
index 0000000..70deaa4
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/DeviceCommsApp.java
@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps.mqtt;
+
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.connectors.mqtt.MqttStreams;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.json.JsonFunctions;
+
+import com.google.gson.JsonObject;
+
+/**
+ * An MQTT Device Communications client for watching device events
+ * and sending commands.
+ * <p>
+ * This is an "application properties" aware client that gets MQTT configuration
+ * from an Edgent sample app application configuration properties file.
+ * <p>
+ * This client avoids the need for other MQTT clients (e.g., from a mosquitto
+ * installation) to observe and control the applications.
+ */
+public class DeviceCommsApp extends AbstractMqttApplication {
+
+ private static final String usage = "Usage: watch | send <cmdLabel> <cmdArg>";
+
+ private String mode;
+ private String cmdLabel;
+ private String cmdArg;
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 1)
+ throw new Exception("missing pathname to application properties file");
+
+ try {
+ int i = 0;
+ DeviceCommsApp application = new DeviceCommsApp(args[i++]);
+ String mode = args[i++];
+ if (!("watch".equals(mode) || "send".equals(mode))) {
+ throw new IllegalArgumentException("Unsupport mode: "+application.mode);
+ }
+ application.mode = mode;
+ if (application.mode.equals("send")) {
+ application.cmdLabel = args[i++];
+ application.cmdArg = args[i++];
+ }
+
+ application.run();
+ }
+ catch (IllegalArgumentException | IndexOutOfBoundsException e) {
+ throw new IllegalArgumentException(e.getMessage()
+ +"\n"+usage);
+ }
+ }
+
+ /**
+ * Create an application instance.
+ * @param propsPath pathname to an application configuration file
+ * @throws Exception
+ */
+ DeviceCommsApp(String propsPath) throws Exception {
+ super(propsPath);
+ }
+
+ @Override
+ protected void buildTopology(Topology t) {
+ mqttDevice().getMqttConfig().setClientId(null);
+ MqttStreams mqtt = new MqttStreams(t, () -> mqttDevice().getMqttConfig());
+ if (mode.equals("send")) {
+ String topic = mqttDevice().commandTopic(cmdLabel);
+ JsonObject jo = new JsonObject();
+ jo.addProperty("value", cmdArg);
+ System.out.println("Publishing command: topic="+topic+" value="+jo);
+ TStream<String> cmd = t.strings(JsonFunctions.asString().apply(jo));
+ mqtt.publish(cmd, topic, QoS.FIRE_AND_FORGET, false/*retain*/);
+ // Hmm... the paho MQTT *non-daemon* threads prevent the app
+ // from exiting after returning from main() following job submit().
+ // Lacking MqttStreams.shutdown() or such...
+ // Delay a bit and then explicitly exit(). Ugh.
+ cmd.sink(tuple -> {
+ try {
+ Thread.sleep(3*1000);
+ } catch (Exception e) { }
+ System.exit(0); });
+ }
+ else if (mode.equals("watch")) {
+ String topicFilter = mqttDevice().eventTopic(null);
+ System.out.println("Watching topic filter "+topicFilter);
+ TStream<String> events = mqtt.subscribe(topicFilter, QoS.FIRE_AND_FORGET,
+ (topic,payload) -> {
+ String s = "\n# topic "+topic;
+ s += "\n" + new String(payload);
+ return s;
+ });
+ events.print();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/package-info.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/package-info.java
new file mode 100644
index 0000000..54598cb
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/package-info.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Base support for Edgent MQTT based application samples.
+ * <p>
+ * This package builds on {@code org.apache.edgent.samples.apps} providing
+ * additional common capabilities in the area of MQTT based device appliations.
+ */
+package org.apache.edgent.samples.apps.mqtt;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/package-info.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/package-info.java
new file mode 100644
index 0000000..cf62ba2
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/package-info.java
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Support for some more complex Edgent application samples.
+ * <p>
+ * This package provides some commonly needed capabilities particularly in
+ * the area of an external configuration description influence on various
+ * things.
+ * <p>
+ * Focal areas:
+ * <ul>
+ * <li>{@link org.apache.edgent.samples.apps.AbstractApplication} - a base class for
+ * Edgent applications providing commonly needed features.
+ * </li>
+ * <li>{@link org.apache.edgent.samples.apps.TopologyProviderFactory} - a configuration
+ * driven factory for an Edgent topology provider.
+ * </li>
+ * <li>{@link org.apache.edgent.samples.apps.ApplicationUtilities} - some
+ * general configuration driven utilities.
+ * </li>
+ * <li>{@link org.apache.edgent.samples.apps.JsonTuples} - utilities for wrapping
+ * sensor samples in a JsonObject and operating on it.
+ * </li>
+ * </ul>
+ */
+package org.apache.edgent.samples.apps;
\ No newline at end of file