You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:08 UTC

[07/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/applicationTemplate.properties
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/applicationTemplate.properties b/samples/apps/src/main/java/edgent/samples/apps/applicationTemplate.properties
deleted file mode 100644
index c96ad00..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/applicationTemplate.properties
+++ /dev/null
@@ -1,81 +0,0 @@
-# A template file for Application Configuration properties
-#
-# The default Edgent topology provider is DirectProvider
-#topology.provider=edgent.providers.development.DevelopmentProvider
-#
-application.name=MyAnalytics
-#
-
-# =========================================================================
-# Application stream logging configuration
-# Where the app puts its stream logs.  
-# The directory will be created when the topology
-# runs if it doesn't already exist.
-application.log.dir=/tmp/MyAnalytics/logs
-
-# =========================================================================
-# Application "ranges" - e.g., for threshold detections
-# Specify values generated by Range.toString():
-#  <lowerBoundType><lowerBound>..<upperBound><upperBoundType>
-#  where
-#      lowerBoundType is "[" inclusive or "(" exclusive
-#      upperBoundType is "]" inclusive or ")" exclusive
-#      lowerBound or upperBound is "*" for open ranges,
-#         e.g., [*..50]  for "atMost" 50
-#
-sensor1.range.outside1hzMeanRange=[124..129]
-
-# =========================================================================
-# MQTT Device and Connector configuration info.
-#
-# MQTT Device -- See edgent.connectors.mqtt.device.MqttDevice for all
-# of the properties.
-#
-# An optional topic prefix.  It can be used to isolate users or applications
-# in shared MQTT broker configurations.  By default it is incorporated
-# into device topics and the MQTT clientId.
-# If you use a public MQTT broker you may want to change the topic
-# prefix so it is still unique for you but doesn't include the
-# user name or application name.
-mqttDevice.topic.prefix=ibm.xyzzy-streams.samples/user/{user.name}/{application.name}/
-#
-# The device id used for identifying the device's events and commands
-# in the MQTT topic namespace.
-# By default it also gets incorporated into the MQTT clientId value.
-mqttDevice.id=012345
-#
-# The MQTT clientId.  Only one instance of a MqttDevice can connect
-# to the MQTT broker with a given clientId.
-#mqttDevice.mqtt.clientId={mqttDevice.topic.prefix}id/{mqttDevice.id}
-#
-# MQTT Connector  See edgent.connectors.mqtt.MqttConfig.fromProperties()
-#
-# The default configuration is for a local MQTT broker.
-# See mosquitto.org for instructions on downloading a MQTT broker.
-# Or use some other MQTT broker available in your environment.
-mqtt.serverURLs=tcp://localhost:1883
-#
-# Alternatively, there are some public MQTT brokers available to experiment with.
-# Their availability status isn't guaranteed.  If you're unable to connect
-# to the broker, it's likely that it isn't up or your firewalls don't
-# allow you to connect.  DO NOT PUBLISH ANYTHING SENSITIVE - anyone
-# can be listing.
-#mqtt.serverURLs=tcp://iot.eclipse.org:1883
-#mqtt.serverURLs=tcp://test.mosquitto.org:1883
-#
-#mqtt.userName=xyzzy
-#mqtt.password=myMosquittoPw
-
-# =========================================================================
-# Patterns for identifying which streams to trace to System.out
-# To enable use include.csv and/or includes.regex.
-# To exclude an otherwise included file, use excludes.csv and/or excludes.regex
-#
-# Some tracing labels
-# sensor1.raw1khz,sensor1.j1khz,sensor1.j1hzStats,sensor1.outside1hzMeanRange*,
-# sensor1.periodicLastN*
-#
-#stream.tracing.includes.csv=sensor1.j1hzStats
-stream.tracing.includes.regex=sensor1.outside1hzMeanRange.*
-#stream.tracing.excludes.regex=.*
-#stream.tracing.excludes.csv=sensor1.raw1khz

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/mqtt/AbstractMqttApplication.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/mqtt/AbstractMqttApplication.java b/samples/apps/src/main/java/edgent/samples/apps/mqtt/AbstractMqttApplication.java
deleted file mode 100644
index 46fa96d..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/mqtt/AbstractMqttApplication.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.apps.mqtt;
-
-import static edgent.connectors.iot.IotDevice.CMD_PAYLOAD;
-
-import java.util.Arrays;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.mqtt.iot.MqttDevice;
-import edgent.samples.apps.AbstractApplication;
-import edgent.samples.apps.ApplicationUtilities;
-import edgent.samples.apps.TopologyProviderFactory;
-import edgent.topology.Topology;
-
-/**
- * An MQTT Application base class.
- * <p>
- * Application instances need to:
- * <ul>
- * <li>define an implementation for {@link #buildTopology(Topology)}</li>
- * <li>call {@link #run()} to build and submit the topology for execution.</li>
- * </ul>
- * <p>
- * The class provides some common processing needs:
- * <ul>
- * <li>Support for an external configuration file</li>
- * <li>Provides a {@link TopologyProviderFactory}</li>
- * <li>Provides a {@link ApplicationUtilities}</li>
- * <li>Provides a {@link MqttDevice}</li>
- * </ul>
- */
-public abstract class AbstractMqttApplication extends AbstractApplication {
-    
-    private MqttDevice mqttDevice;
-    
-    public AbstractMqttApplication(String propsPath) throws Exception {
-        super(propsPath);
-    }
-    
-    @Override
-    protected void preBuildTopology(Topology t) {
-        // Add an MQTT device communication manager to the topology
-        updateTopicPrefix();
-        mqttDevice = new MqttDevice(t, props);
-        System.out.println("MqttDevice serverURLs " + Arrays.toString(mqttDevice.getMqttConfig().getServerURLs()));
-        System.out.println("MqttDevice clientId " + mqttDevice.getMqttConfig().getClientId());
-        System.out.println("MqttDevice deviceId " + props.getProperty("mqttDevice.id"));
-        System.out.println("MqttDevice event topic pattern " + mqttDevice.eventTopic(null));
-        System.out.println("MqttDevice command topic pattern " + mqttDevice.commandTopic(null));
-    }
-    
-    /**
-     * Get the application's MqttDevice
-     * @return the MqttDevice
-     */
-    public MqttDevice mqttDevice() {
-        return mqttDevice;
-    }
-    
-    private void updateTopicPrefix() {
-        String val = props.getProperty("mqttDevice.topic.prefix");
-        if (val != null) {
-            val = val.replace("{user.name}", System.getProperty("user.name"));
-            val = val.replace("{application.name}", props.getProperty("application.name"));
-            props.setProperty("mqttDevice.topic.prefix", val);
-        }
-    }
-    
-    /**
-     * Compose a MqttDevice eventId for the sensor.
-     * @param sensorId the sensor id
-     * @param eventId the sensor's eventId
-     * @return the device eventId
-     */
-    public String sensorEventId(String sensorId, String eventId) {
-        return sensorId + "." + eventId;
-    }
-    
-    /**
-     * Compose a MqttDevice commandId for the sensor
-     * @param sensorId the sensor id
-     * @param commandId the sensor's commandId
-     * @return the device commandId
-     */
-    public String commandId(String sensorId, String commandId) {
-        return sensorId + "." + commandId;
-    }
-    
-    /**
-     * Extract a simple string valued command arg 
-     * from a {@link MqttDevice#commands(String...)} returned
-     * JsonObject tuple.
-     * <p>
-     * Interpret the JsonObject's embedded payload as a JsonObject with a single
-     * "value" property.
-     * @param jo the command tuple.
-     * @return the command's argument value 
-     */
-    public String getCommandValueString(JsonObject jo) {
-        return jo.get(CMD_PAYLOAD).getAsJsonObject().get("value").getAsString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/mqtt/DeviceCommsApp.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/mqtt/DeviceCommsApp.java b/samples/apps/src/main/java/edgent/samples/apps/mqtt/DeviceCommsApp.java
deleted file mode 100644
index e399eca..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/mqtt/DeviceCommsApp.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.apps.mqtt;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.iot.QoS;
-import edgent.connectors.mqtt.MqttStreams;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.json.JsonFunctions;
-
-/**
- * An MQTT Device Communications client for watching device events
- * and sending commands.
- * <p>
- * This is an "application properties" aware client that gets MQTT configuration
- * from an Edgent sample app application configuration properties file.
- * <p>
- * This client avoids the need for other MQTT clients (e.g., from a mosquitto
- * installation) to observe and control the applications.
- */
-public class DeviceCommsApp extends AbstractMqttApplication {
-    
-    private static final String usage = "Usage: watch | send <cmdLabel> <cmdArg>";
-
-    private String mode;
-    private String cmdLabel;
-    private String cmdArg;
-    
-    public static void main(String[] args) throws Exception {
-        if (args.length < 1)
-            throw new Exception("missing pathname to application properties file");
-        
-        try {
-            int i = 0;
-            DeviceCommsApp application = new DeviceCommsApp(args[i++]);
-            String mode = args[i++];
-            if (!("watch".equals(mode) || "send".equals(mode))) {
-                throw new IllegalArgumentException("Unsupport mode: "+application.mode);
-            }
-            application.mode = mode;
-            if (application.mode.equals("send")) {
-                application.cmdLabel = args[i++];
-                application.cmdArg = args[i++];
-            }
-        
-            application.run();
-        }
-        catch (IllegalArgumentException | IndexOutOfBoundsException e) {
-            throw new IllegalArgumentException(e.getMessage()
-                    +"\n"+usage);
-        }
-    }
-    
-    /**
-     * Create an application instance.
-     * @param propsPath pathname to an application configuration file
-     * @throws Exception
-     */
-    DeviceCommsApp(String propsPath) throws Exception {
-        super(propsPath);
-    }
-    
-    @Override
-    protected void buildTopology(Topology t) {
-        mqttDevice().getMqttConfig().setClientId(null);
-        MqttStreams mqtt = new MqttStreams(t, () -> mqttDevice().getMqttConfig());
-        if (mode.equals("send")) {
-            String topic = mqttDevice().commandTopic(cmdLabel);
-            JsonObject jo = new JsonObject();
-            jo.addProperty("value", cmdArg);
-            System.out.println("Publishing command: topic="+topic+"  value="+jo);
-            TStream<String> cmd = t.strings(JsonFunctions.asString().apply(jo));
-            mqtt.publish(cmd, topic, QoS.FIRE_AND_FORGET, false/*retain*/);
-            // Hmm... the paho MQTT *non-daemon* threads prevent the app
-            // from exiting after returning from main() following job submit().
-            // Lacking MqttStreams.shutdown() or such...
-            // Delay a bit and then explicitly exit().  Ugh.
-            cmd.sink(tuple -> { 
-                try {
-                    Thread.sleep(3*1000);
-                } catch (Exception e) { }
-                System.exit(0); });
-        }
-        else if (mode.equals("watch")) {
-            String topicFilter = mqttDevice().eventTopic(null);
-            System.out.println("Watching topic filter "+topicFilter);
-            TStream<String> events = mqtt.subscribe(topicFilter, QoS.FIRE_AND_FORGET,
-                    (topic,payload) -> { 
-                        String s = "\n# topic "+topic;
-                        s += "\n" + new String(payload);
-                        return s;
-                    });
-            events.print();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/mqtt/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/mqtt/package-info.java b/samples/apps/src/main/java/edgent/samples/apps/mqtt/package-info.java
deleted file mode 100644
index 66eb514..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/mqtt/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Base support for Edgent MQTT based application samples.
- * <p>
- * This package builds on {@code edgent.samples.apps} providing
- * additional common capabilities in the area of MQTT based device appliations.
- */
-package edgent.samples.apps.mqtt;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/package-info.java b/samples/apps/src/main/java/edgent/samples/apps/package-info.java
deleted file mode 100644
index 102006b..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/package-info.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Support for some more complex Edgent application samples.
- * <p>
- * This package provides some commonly needed capabilities particularly in
- * the area of an external configuration description influence on various
- * things.
- * <p>
- * Focal areas:
- * <ul>
- * <li>{@link edgent.samples.apps.AbstractApplication} - a base class for
- *     Edgent applications providing commonly needed features.
- *     </li>
- * <li>{@link edgent.samples.apps.TopologyProviderFactory} - a configuration
- *     driven factory for an Edgent topology provider.
- *     </li>
- * <li>{@link edgent.samples.apps.ApplicationUtilities} - some
- *     general configuration driven utilities. 
- *     </li>
- * <li>{@link edgent.samples.apps.JsonTuples} - utilities for wrapping
- *     sensor samples in a JsonObject and operating on it.
- *     </li>
- * </ul>
- */
-package edgent.samples.apps;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/Sensor1.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/Sensor1.java b/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/Sensor1.java
deleted file mode 100644
index 78fefa9..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/Sensor1.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.apps.sensorAnalytics;
-
-import static edgent.analytics.math3.stat.Statistic.MAX;
-import static edgent.analytics.math3.stat.Statistic.MEAN;
-import static edgent.analytics.math3.stat.Statistic.MIN;
-import static edgent.analytics.math3.stat.Statistic.STDDEV;
-import static edgent.samples.apps.JsonTuples.KEY_ID;
-import static edgent.samples.apps.JsonTuples.KEY_READING;
-import static edgent.samples.apps.JsonTuples.KEY_TS;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.math3.util.Pair;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-
-import edgent.analytics.sensors.Range;
-import edgent.analytics.sensors.Ranges;
-import edgent.connectors.iot.QoS;
-import edgent.function.Supplier;
-import edgent.samples.apps.JsonTuples;
-import edgent.samples.utils.sensor.PeriodicRandomSensor;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-
-/**
- * Analytics for "Sensor1".
- * <p>
- * This sample demonstrates some common continuous sensor analytic themes.
- * <p>
- * In this case we have a simulated sensor producing 1000 samples per second
- * of an integer type in the range of 0-255.
- * <p>
- * The processing pipeline created is roughly:
- * <ul>
- * <li>Batched Data Reduction - reduce the sensor's 1000 samples per second
- *     down to 1 sample per second simple statistical aggregation of the readings.
- *     </li>
- * <li>Compute historical information - each 1hz sample is augmented
- *     with a 30 second trailing average of the 1hz readings.
- *     </li>
- * <li>Threshold detection - each 1hz sample's value is compared
- *     against a target range and outliers are identified.
- *     </li>
- * <li>Local logging - outliers are logged to a local file
- *     </li>
- * <li>Publishing results to a MQTT broker:
- *     <ul>
- *     <li>when enabled, invdividual outliers are published.</li>
- *     <li>Every 30 seconds a list of the last 10 outliers is published.</li>
- *     </ul>
- *     </li>
- * </ul>
- * <p>
- * The sample also demonstrates:
- * <ul>
- * <li>Dynamic configuration control - subscribe to a MQTT broker
- *     to receive commands to adjust the threshold detection range value. 
- *     </li>
- * <li>Generally, the configuration of the processing is driven via an
- *     external configuration description.
- *     </li>
- * <li>Conditional stream tracing - configuration controlled inclusion of tracing.
- *     </li>
- * <li>Use of {@link TStream#tag(String...)} to improve information provided by
- *     the Edgent DevelopmentProvider console.</li>
- * </ul>
- */
-public class Sensor1 {
-    private final SensorAnalyticsApplication app;
-    private final Topology t;
-    private final String sensorId = "sensor1";
-
-    public Sensor1(Topology t, SensorAnalyticsApplication app) {
-        this.t = t;
-        this.app = app;
-    }
-    
-    /**
-     * Add the sensor's analytics to the topology.
-     */
-    public void addAnalytics() {
-
-        // Need synchronization for set/get of dynamically changeable values.
-        AtomicReference<Range<Integer>> range = new AtomicReference<>();
-        AtomicReference<Boolean> isPublish1hzOutsideRange = new AtomicReference<>();
-        
-        // Initialize the controls
-        range.set(app.utils().getRangeInteger(sensorId, "outside1hzMeanRange"));
-        isPublish1hzOutsideRange.set(false);
-        
-        // Handle the sensor's device commands
-        app.mqttDevice().commands(commandId("set1hzMeanRangeThreshold"))
-            .tag(commandId("set1hzMeanRangeThresholdCmd"))
-            .sink(jo -> {
-                    Range<Integer> newRange = Ranges.valueOfInteger(getCommandValue(jo));
-                    System.out.println("===== Changing range to "+newRange+" ======");
-                    range.set(newRange);
-                });
-        app.mqttDevice().commands(commandId("setPublish1hzOutsideRange"))
-            .tag(commandId("setPublish1hzOutsideRangeCmd"))
-            .sink(jo -> {
-                    Boolean b = new Boolean(getCommandValue(jo));
-                    System.out.println("===== Changing isPublish1hzOutsideRange to "+b+" ======");
-                    isPublish1hzOutsideRange.set(b);
-                });
-        
-        // Create a raw simulated sensor stream of 1000 tuples/sec.
-        // Each tuple is Pair<Long timestampMsec, sensor-reading (0..255)>.
-        PeriodicRandomSensor simulatedSensorFactory = new PeriodicRandomSensor();
-        TStream<Pair<Long,Integer>> raw1khz = 
-                simulatedSensorFactory.newInteger(t, 1/*periodMsec*/, 255)
-                .tag("raw1khz");
-        traceStream(raw1khz, "raw1khz");
-        
-        // Wrap the raw sensor reading in a JsonObject for convenience.
-        TStream<JsonObject> j1khz = JsonTuples.wrap(raw1khz, sensorId)
-                .tag("j1khz");
-        traceStream(j1khz, "j1khz");
-        
-        // Data-reduction: reduce 1khz samples down to
-        // 1hz aggregate statistics samples.
-        TStream<JsonObject> j1hzStats = j1khz.last(1000, JsonTuples.keyFn())
-                .batch(JsonTuples.statistics(MIN, MAX, MEAN, STDDEV))
-                .tag("1hzStats");
-        
-        // Create a 30 second sliding window of average trailing Mean values
-        // and enrich samples with that information.
-        j1hzStats = j1hzStats.last(30, JsonTuples.keyFn()).aggregate(
-            (samples, key) -> {
-                // enrich and return the most recently added tuple
-                JsonObject jo = samples.get(samples.size()-1);
-                double meanSum = 0;
-                for (JsonObject js : samples) {
-                    meanSum += JsonTuples.getStatistic(js, MEAN).getAsDouble();
-                }
-                jo.addProperty("AvgTrailingMean", Math.round(meanSum / samples.size()));
-                jo.addProperty("AvgTrailingMeanCnt", samples.size());
-                return jo;
-            })
-            .tag("1hzStats.enriched");
-        traceStream(j1hzStats, "j1hzStats");
-
-        // Detect 1hz samples whose MEAN value are
-        // outside the configuration specified range.
-        TStream<JsonObject> outside1hzMeanRange = j1hzStats.filter(
-                sample -> {
-                    int value = JsonTuples.getStatistic(sample, MEAN).getAsInt();
-                    return !range.get().contains(value);
-                })
-                .tag("outside1hzMeanRange");
-        traceStream(outside1hzMeanRange, () -> "outside1hzMeanRange"+range.get()); 
-        
-        // Log every outside1hzMeanRange event
-        app.utils().logStream(outside1hzMeanRange, "ALERT", "outside1hzMeanRange");
-        
-        // Conditionally publish every outside1hzMeanRange event.
-        // Use a pressureReliever to prevent backpressure if the broker
-        // can't be contacted.
-        // TODO enhance MqttDevice with configurable reliever. 
-        app.mqttDevice().events(
-                PlumbingStreams.pressureReliever(
-                    outside1hzMeanRange.filter(tuple -> isPublish1hzOutsideRange.get())
-                                       .tag("outside1hzMeanRangeEvent.conditional"),
-                    tuple -> 0, 30).tag("outside1hzMeanRangeEvent.pressureRelieved"),
-                app.sensorEventId(sensorId, "outside1hzMeanRangeEvent"), QoS.FIRE_AND_FORGET);
-        
-        // Demonstrate periodic publishing of a sliding window if
-        // something changed since it was last published.
-        periodicallyPublishLastNInfo(outside1hzMeanRange, 10, 30,
-                "periodicLastOutsideRangeEvent");
-        
-        // TODO histogram: #alerts over the last 8hr
-
-    }
-    
-    /**
-     * Periodically publish the lastN on a stream.
-     * @param stream tuples to 
-     * @param count sliding window size "lastN"
-     * @param nSec publish frequency
-     * @param event sensor's publish event label
-     */
-    private void periodicallyPublishLastNInfo(TStream<JsonObject> stream, 
-            int count, int nSec, String event) {
-
-        // Demonstrate periodic publishing of a sliding window if
-        // something changed since it was last published.
-
-        // Maintain a sliding window of the last N tuples.
-        // TODO today, windows don't provide "anytime" access to their collection
-        // so maintain our own current copy of the collection that we can
-        // access it when needed.
-        // 
-        List<JsonObject> lastN = Collections.synchronizedList(new ArrayList<>());
-        stream.last(count, JsonTuples.keyFn())
-            .aggregate((samples, key) -> samples)
-            .tag(event+".lastN")
-            .sink(samples -> {
-                    // Capture the new list/window.  
-                    synchronized(lastN) {
-                        lastN.clear();
-                        lastN.addAll(samples);
-                    }
-                });
-    
-        // Publish the lastN (with trimmed down info) every nSec seconds
-        // if anything changed since the last publish.
-        TStream<JsonObject> periodicLastN = 
-                t.poll(() -> 1, nSec, TimeUnit.SECONDS).tag(event+".trigger")
-                .filter(trigger -> !lastN.isEmpty()).tag(event+".changed")
-                .map(trigger -> {
-                    synchronized(lastN) {
-                        // create a single JsonObject with the list
-                        // of reduced-content samples
-                        JsonObject jo = new JsonObject();
-                        jo.addProperty(KEY_ID, sensorId);
-                        jo.addProperty(KEY_TS, System.currentTimeMillis());
-                        jo.addProperty("window", count);
-                        jo.addProperty("pubFreqSec", nSec);
-                        JsonArray ja = new JsonArray();
-                        jo.add("lastN", ja);
-                        for (JsonObject j : lastN) {
-                            JsonObject jo2 = new JsonObject();
-                            ja.add(jo2);
-                            jo2.add(KEY_TS, j.get(KEY_TS));
-                            // reduce size: include only 2 significant digits
-                            jo2.addProperty(KEY_READING, String.format("%.2f", 
-                                JsonTuples.getStatistic(j, MEAN).getAsDouble()));
-                        }
-                        lastN.clear();
-                        return jo;
-                    }
-                })
-                .tag(event);
-
-        traceStream(periodicLastN, event);
-
-        // Use a pressureReliever to prevent backpressure if the broker
-        // can't be contacted.
-        // TODO enhance MqttDevice with configurable reliever. 
-        app.mqttDevice().events(
-                PlumbingStreams.pressureReliever(periodicLastN, tuple -> 0, 30)
-                    .tag(event+".pressureRelieved"),
-                app.sensorEventId(sensorId, event), QoS.FIRE_AND_FORGET);
-    }
-    
-    private String commandId(String commandId) {
-        return app.commandId(sensorId, commandId);
-    }
-    
-    private String getCommandValue(JsonObject jo) {
-        return app.getCommandValueString(jo);
-    }
-    
-    private <T> TStream<T> traceStream(TStream<T> stream, String label) {
-        return traceStream(stream, () -> label); 
-    }
-    
-    private <T> TStream<T> traceStream(TStream<T> stream, Supplier<String> label) {
-        return app.utils().traceStream(stream, sensorId, label); 
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java b/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java
deleted file mode 100644
index 59f60e6..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.apps.sensorAnalytics;
-
-import edgent.samples.apps.mqtt.AbstractMqttApplication;
-import edgent.topology.Topology;
-
-/**
- * A sample application demonstrating some common sensor analytic processing
- * themes.
- */
-public class SensorAnalyticsApplication extends AbstractMqttApplication {
-    
-    public static void main(String[] args) throws Exception {
-        if (args.length != 1)
-            throw new Exception("missing pathname to application properties file");
-        
-        SensorAnalyticsApplication application = new SensorAnalyticsApplication(args[0]);
-        
-        application.run();
-    }
-    
-    /**
-     * Create an application instance.
-     * @param propsPath pathname to an application configuration file
-     * @throws Exception
-     */
-    SensorAnalyticsApplication(String propsPath) throws Exception {
-        super(propsPath);
-    }
-    
-    @Override
-    protected void buildTopology(Topology t) {
-        
-        // Add the "sensor1" analytics to the topology
-        new Sensor1(t, this).addAnalytics();
-        
-        // TODO Add the "sensor2" analytics to the topology
-        // TODO Add the "sensor3" analytics to the topology
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/package-info.java b/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/package-info.java
deleted file mode 100644
index a6bfa60..0000000
--- a/samples/apps/src/main/java/edgent/samples/apps/sensorAnalytics/package-info.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * The Sensor Analytics sample application demonstrates some common 
- * continuous sensor analytic application themes.
- * See {@link edgent.samples.apps.sensorAnalytics.Sensor1 Sensor1} for the
- * core of the analytics processing and  
- * {@link edgent.samples.apps.sensorAnalytics.SensorAnalyticsApplication
- * SensorAnalyticsApplication}
- * for the main program.
- * <p>
- * The themes include:
- * <ul>
- * <li>Batched Data Reduction - reducing higher frequency sensor reading
- *     samples down to a lower frequency using statistical aggregations
- *     of the raw readings.
- *     </li>
- * <li>Computing continuous historical statistics such as a
- *     30 second trailing average of sensor readings.
- *     </li>
- * <li>Outlier / threshold detection against a configurable range</li>
- * <li>Local logging of stream tuples</li>
- * <li>Publishing analytic results to an MQTT broker</li>
- * <li>Dynamic configuration control - subscribing to a MQTT broker
- *     to receive commands to adjust the threshold detection range value. 
- *     </li>
- * <li>Generally, the configuration of the processing is driven via an
- *     external configuration description.
- *     </li>
- * <li>Conditional stream tracing - configuration controlled inclusion of tracing.
- *     </li>
- * </ul>
- * 
- * <h2>Prerequisites:</h2>
- * <p>
- * The default configuration is for a local MQTT broker.
- * A good resource is <a href="http://mosquitto.org">mosquitto.org</a>
- * if you want to download and setup your own MQTT broker.
- * Or you can use some other broker available in your environment.
- * <p>
- * Alternatively, there are some public MQTT brokers available to experiment with.
- * Their availability status isn't guaranteed.  If you're unable to connect
- * to the broker, it's likely that it isn't up or your firewalls don't
- * allow you to connect.  DO NOT PUBLISH ANYTHING SENSITIVE - anyone
- * can be listing.  A couple of public broker locations are noted
- * in the application's properties file.
- * <p>
- * The default {@code mqttDevice.topic.prefix} value, used by default in 
- * generated MQTT topic values and MQTT clientId, contains the user's
- * local login id.  The SensorAnalytics sample application does not have any
- * other sensitive information.
- * <p>
- * Edit {@code <edgent-release>/java8/scripts/apps/sensorAnalytics/sensoranalytics.properties}
- * to change the broker location or topic prefix.
- * 
- * <h2>Application output:</h2>
- * <p>
- * The application periodically (every 30sec), publishes a list of
- * the last 10 outliers to MQTT.  When enabled, it also publishes 
- * full details of individual outliers as they occur.
- * It also subscribes to MQTT topics for commands to dynamically change the
- * threshold range and whether to publish individual outliers.
- * <p>
- * All MQTT configuration information, including topic patterns,
- * are in the application.properties file.
- * <p>
- * The application logs outlier events in local files.  The actual location
- * is specified in the application.properties file.
- * <p>
- * The application generates some output on stdout and stderr.
- * The information includes:
- * <ul>
- * <li>MQTT device info. Lines 1 through 5 in the sample console output below.</li>
- * <li>URL for the Edgent development console.  Line 6.</li>
- * <li>Trace of the outlier event stream. Line 7.
- *     The output is a label, which includes the active threshold range,
- *     followed by the event's JSON.
- *     These are the events that will also be logged and conditionally published
- *     as well as included in the periodic lastN info published every 30sec.
- *     </li>
- * <li>Announcement when a "change threshold" or "enable publish of 1khz outliers"
- *     command is received and processed.
- *     Line 8 and 9. 
- *     </li>
- * <li>At this time some INFO trace output from the MQTT connector</li>
- * <li>At this time some INFO trace output from the File connector</li>
- * </ul>
- * Sample console output:
- * <pre>{@code
- * [1] MqttDevice serverURLs [tcp://localhost:1883]
- * [2] MqttDevice clientId id/012345
- * [3] MqttDevice deviceId 012345
- * [4] MqttDevice event topic pattern id/012345/evt/+/fmt/json
- * [5] MqttDevice command topic pattern id/012345/cmd/+/fmt/json
- * [6] Edgent Console URL for the job: http://localhost:57324/console
- * [7] sensor1.outside1hzMeanRange[124..129]: {"id":"sensor1","reading":{"N":1000,"MIN":0.0,"MAX":254.0,"MEAN":130.23200000000006,"STDDEV":75.5535473324351},"msec":1454623874408,"agg.begin.msec":1454623873410,"agg.count":1000,"AvgTrailingMean":128,"AvgTrailingMeanCnt":4}
- * ...
- * [8] ===== Changing range to [125..127] ======
- * sensor1.outside1hzMeanRange[125..127]: {"id":"sensor1","reading":{"N":1000,"MIN":0.0,"MAX":254.0,"MEAN":129.00099999999978,"STDDEV":74.3076080870567},"msec":1454624142419,"agg.begin.msec":1454624141420,"agg.count":1000,"AvgTrailingMean":127,"AvgTrailingMeanCnt":30}
- * [9] ===== Changing isPublish1hzOutsideRange to true ======
- * ...
- * }</pre>
- * 
- * <h2>Running, observing and controlling the application:</h2>
- * <pre>{@code
- * $ ./runSensorAnalytics.sh
- * }</pre>
- * <p>
- * To observe the locally logged outlier events:
- * <pre>{@code
- * $ tail -f /tmp/SensorAnalytics/logs/.outside1hzMeanRange
- * }</pre>
- * <p>
- * To observe the events that are getting published to MQTT:
- * <pre>{@code
- * $ ./runDeviceComms.sh watch
- * }</pre>
- * <p>
- * To change the outlier threshold setting:
- * <br>The command value is the new range string: {@code [<lowerBound>..<upperBound>]}.
- * <pre>{@code
- * $ ./runDeviceComms.sh send sensor1.set1hzMeanRangeThreshold "[125..127]"
- * }</pre>
- * <p>
- * To change the "publish individual 1hz outliers" control:
- * <pre>{@code
- * $ ./runDeviceComms.sh send sensor1.setPublish1hzOutsideRange true
- * }</pre>
- * 
- * <h3>Alternative MQTT clients</h3>
- * You can use any MQTT client but you will have to specify the 
- * MQTT server, the event topics to watch / subscribe to, and the command topics
- * and JSON for publish commands.  The MqttDevice output above provides most
- * of the necessary information.
- * <p>
- * For example, the {@code mosquitto_pub} and
- * {@code mosquitto_sub} commands equivalent to the above runDeviceComms.sh
- * commands are:
- * <pre>{@code
- * # Watch the device's event topics
- * $ /usr/local/bin/mosquitto_sub -t id/012345/evt/+/fmt/json
- * # change the outlier threshold setting
- * $ /usr/local/bin/mosquitto_pub -m '{"value":"[125..127]"}' -t id/012345/cmd/sensor1.set1hzMeanRangeThreshold/fmt/json
- * # change the "publish individual 1hz outliers" control
- * $ /usr/local/bin/mosquitto_pub -m '{"value":"true"}' -t id/012345/cmd/sensor1.setPublish1hzOutsideRange/fmt/json
- * }</pre>
- */
-package edgent.samples.apps.sensorAnalytics;

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/AbstractApplication.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/AbstractApplication.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/AbstractApplication.java
new file mode 100644
index 0000000..10099dd
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/AbstractApplication.java
@@ -0,0 +1,130 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Properties;
+
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.samples.apps.mqtt.AbstractMqttApplication;
+import org.apache.edgent.topology.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Application base class.
+ * <p>
+ * Application instances need to:
+ * <ul>
+ * <li>define an implementation for {@link #buildTopology(Topology)}</li>
+ * <li>call {@link #run()} to build and submit the topology for execution.</li>
+ * </ul>
+ * <p>
+ * The class provides some common processing needs:
+ * <ul>
+ * <li>Support for an external configuration file</li>
+ * <li>Provides a {@link TopologyProviderFactory}</li>
+ * <li>Provides a {@link ApplicationUtilities}</li>
+ * </ul>
+ * @see AbstractMqttApplication
+ */
+public abstract class AbstractApplication {
+    
+    protected final String propsPath;
+    protected final Properties props;
+    private final ApplicationUtilities applicationUtilities;
+    private static final Logger logger = LoggerFactory.getLogger(AbstractApplication.class);
+
+    protected Topology t;
+    
+    public AbstractApplication(String propsPath) throws Exception {
+        this.propsPath = propsPath;
+        props = new Properties();
+        props.load(Files.newBufferedReader(new File(propsPath).toPath()));
+        applicationUtilities = new ApplicationUtilities(props);
+    }
+    
+    /**
+     * Construct and run the application's topology.
+     * @throws Exception on failure
+     */
+    protected void run() throws Exception {
+// TODO need to setup logging to squelch stderr output from the runtime/connectors, 
+// including paho output
+
+        TopologyProviderFactory tpFactory = new TopologyProviderFactory(props);
+        
+        DirectProvider tp = tpFactory.newProvider();
+        
+        // Create a topology for the application
+        t = tp.newTopology(config().getProperty("application.name"));
+        
+        preBuildTopology(t);
+        
+        buildTopology(t);
+        
+        // Run the topology
+        HttpServer httpServer = tp.getServices().getService(HttpServer.class);
+        if (httpServer != null) {
+            System.out.println("Edgent Console URL for the job: "
+                                + httpServer.getConsoleUrl());
+        }
+        tp.submit(t);
+    }
+    
+    /**
+     * Get the application's raw configuration information.
+     * @return the configuration
+     */
+    public Properties config() {
+        return props;
+    }
+    
+    /**
+     * Get the application's 
+     * @return the helper
+     */
+    public ApplicationUtilities utils() {
+        return applicationUtilities;
+    }
+
+    /**
+     * A hook for a subclass to do things prior to the invocation
+     * of {@link #buildTopology(Topology)}.
+     * <p>
+     * The default implementation is a no-op.
+     * @param t the application's topology
+     */
+    protected void preBuildTopology(Topology t) {
+        return;
+    }
+    
+    /**
+     * Build the application's topology.
+     * @param t Topology to add to
+     */
+    abstract protected void buildTopology(Topology t);
+    
+    public void handleRuntimeError(String msg, Exception e) {
+        logger.error("A runtime error occurred", e);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/ApplicationUtilities.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/ApplicationUtilities.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/ApplicationUtilities.java
new file mode 100644
index 0000000..b8a0cee
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/ApplicationUtilities.java
@@ -0,0 +1,255 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps;
+
+import java.io.File;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.edgent.analytics.sensors.Range;
+import org.apache.edgent.analytics.sensors.Ranges;
+import org.apache.edgent.connectors.file.FileStreams;
+import org.apache.edgent.connectors.file.FileWriterCycleConfig;
+import org.apache.edgent.connectors.file.FileWriterFlushConfig;
+import org.apache.edgent.connectors.file.FileWriterPolicy;
+import org.apache.edgent.connectors.file.FileWriterRetentionConfig;
+import org.apache.edgent.function.Predicate;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TStream;
+
+/**
+ * Some general purpose application configuration driven utilities.
+ * <p>
+ * Utilities include:
+ * <ul>
+ * <li>Get a property name for a sensor configuration item</li>
+ * <li>Get a Range value for a sensor range item</li>
+ * <li>Log a stream</li>
+ * <li>Conditionally trace a stream</li>
+ * </ul>
+ */
+public class ApplicationUtilities {
+    
+    private final Properties props;
+    
+    public ApplicationUtilities(Properties props) {
+        this.props = props;
+    }
+
+    private Properties config() {
+        return props;
+    }
+    
+    /**
+     * Trace a stream to System.out if the sensor id's "label" has been configured
+     * to enable tracing.
+     * <p>
+     * If tracing has not been enabled in the config, the topology will not
+     * be augmented to trace the stream.
+     *
+     * @param <T> Tuple type
+     * @param stream the stream to trace
+     * @param sensorId the sensor id
+     * @param label some unique label
+     * @return the input stream
+     */
+    public <T> TStream<T> traceStream(TStream<T> stream, String sensorId, Supplier<String> label) {
+        return traceStream(stream, () -> sensorId+"."+label.get());
+    }
+
+    /**
+     * Trace a stream to System.out if the "label" has been configured
+     * to enable tracing.
+     * <p>
+     * If tracing has not been enabled in the config, the topology will not
+     * be augmented to trace the stream.
+     * 
+     * @param <T> Tuple type
+     * @param stream the stream to trace
+     * @param label some unique label
+     * @return the input stream
+     */
+    public <T> TStream<T> traceStream(TStream<T> stream, Supplier<String> label) {
+        if (includeTraceStreamOps(label.get())) {
+            TStream<?> s = stream.filter(traceTuplesFn(label.get())).tag(label.get()+".trace");
+            s.peek(sample -> System.out.println(String.format("%s: %s", label.get(), sample.toString())));
+        }
+        return stream;
+    }
+    
+    private boolean includeTraceStreamOps(String label) {
+        String includesCsv = config().getProperty("stream.tracing.includes.csv", "");
+        String includesRegex = config().getProperty("stream.tracing.includes.regex", "");
+        String excludesCsv = config().getProperty("stream.tracing.excludes.csv", "");
+        String excludesRegex = config().getProperty("stream.tracing.excludes.regex", "");
+        
+        Set<String> includesSet = new HashSet<>();
+        for (String s : includesCsv.split(","))
+            includesSet.add(s.trim());
+        Set<String> excludesSet = new HashSet<>();
+        for (String s : excludesCsv.split(","))
+            excludesSet.add(s.trim());
+        
+        boolean isIncluded = false;
+        if (includesSet.contains(label) || label.matches(includesRegex))
+            isIncluded = true;
+        if (excludesSet.contains(label) || label.matches(excludesRegex))
+            isIncluded = false;
+        
+        return isIncluded;
+    }
+    
+    private <T> Predicate<T> traceTuplesFn(String label) {
+        return tuple -> true; // TODO make dynamic config; affected by "label" value
+        // check label for match against csv or regex from props
+    }
+    
+    /**
+     * Get the property name for a sensor's configuration item.
+     * @param sensorId the sensor's id
+     * @param label the label for an instance of "kind" (e.g., "tempThreshold")
+     * @param kind the kind of configuration item (e.g., "range")
+     * @return the configuration property name
+     */
+    public String getSensorPropertyName(String sensorId, String label, String kind) {
+        String name = kind + "." + label;  // kind.label
+        if (sensorId!=null && !sensorId.isEmpty())
+            name = sensorId + "." + name;  // sensorId.kind.label
+        return name;
+    }
+
+    private String getSensorConfigValue(String sensorId, String label, String kind) {
+        String name = getSensorPropertyName(sensorId, label, kind);
+        String val = config().getProperty(name);
+        if (val==null)
+            throw new IllegalArgumentException("Missing configuration property "+name);
+        return val;
+    }
+    
+    /**
+     * Get the Range for a sensor range configuration item.
+     * @param sensorId the sensor's id
+     * @param label the range's label
+     * @return the Range
+     */
+    public Range<Integer> getRangeInteger(String sensorId, String label) {
+        String val = getSensorConfigValue(sensorId, label, "range");
+        return Ranges.valueOfInteger(val);
+    }
+    
+    /**
+     * Get the Range for a sensor range configuration item.
+     * @param sensorId the sensor's id
+     * @param label the range's label
+     * @return the Range
+     */
+    public Range<Byte> getRangeByte(String sensorId, String label) {
+        String val = getSensorConfigValue(sensorId, label, "range");
+        return Ranges.valueOfByte(val);
+    }
+    
+    /**
+     * Get the Range for a sensor range configuration item.
+     * @param sensorId the sensor's id
+     * @param label the range's label
+     * @return the Range
+     */
+    public Range<Short> getRangeShort(String sensorId, String label) {
+        String val = getSensorConfigValue(sensorId, label, "range");
+        return Ranges.valueOfShort(val);
+    }
+    
+    /**
+     * Get the Range for a sensor range configuration item.
+     * @param sensorId the sensor's id
+     * @param label the range's label
+     * @return the Range
+     */
+    public Range<Float> getRangeFloat(String sensorId, String label) {
+        String val = getSensorConfigValue(sensorId, label, "range");
+        return Ranges.valueOfFloat(val);
+    }
+    
+    /**
+     * Get the Range for a sensor range configuration item.
+     * @param sensorId the sensor's id
+     * @param label the range's label
+     * @return the Range
+     */
+    public Range<Double> getRangeDouble(String sensorId, String label) {
+        String val = getSensorConfigValue(sensorId, label, "range");
+        return Ranges.valueOfDouble(val);
+    }
+
+    /**
+     * Log every tuple on the stream using the {@code FileStreams} connector.
+     * <p>
+     * The logs are added to the directory as specified
+     * by the "application.log.dir" property.
+     * The directory will be created as needed.
+     * <p>
+     * The "active" (open / being written) log file name is {@code .<baseName>}.
+     * <br>
+     * Completed stable logs have a name of {@code <baseName>_YYYYMMDD_HHMMSS}.
+     * <p>
+     * The log entry format being used is:
+     * {@code [<date>] [<eventTag>] <tuple>.toString()}
+     * <p>
+     * See {@link FileStreams#textFileWriter(TStream, org.apache.edgent.function.Supplier, org.apache.edgent.function.Supplier)}
+     * 
+     * @param <T> Tuple type
+     * @param stream the TStream
+     * @param baseName the base log name
+     * @param eventTag a tag that gets added to the log entry
+     * @return the input stream
+     */
+    public <T> TStream<T> logStream(TStream<T> stream, String eventTag, String baseName) {
+        // Define the writer policy.
+        // TODO could make the policy configurable via config()
+        FileWriterPolicy<String> policy = new FileWriterPolicy<String>(
+                FileWriterFlushConfig.newTimeBasedConfig(2_000/*msec*/), // flush every 2sec
+                FileWriterCycleConfig.newFileSizeBasedConfig(10_000),  // new file every 10KB
+                FileWriterRetentionConfig.newFileCountBasedConfig(1)   // retain 1 file
+                );
+        
+        // Compose the base file pathname
+        File dir = new File(config().getProperty("application.log.dir"));
+        String basePathname = new File(dir, baseName).toString();
+         
+        // Transform the stream to a TStream<String> of string log entry values
+        TStream<String> stringEntries = stream.map(sample -> String.format("[%s] [%s] %s", new Date().toString(), eventTag, sample.toString()))
+                .tag(baseName+".log");
+
+        // Use the FileStreams connector to write the logs.
+        //
+        // A hack for getting the log directories created at runtime
+        // TODO add another policy thing... or simply make textFileWriter do it?
+        //
+        FileStreams.textFileWriter(stringEntries,
+                () -> { if (!dir.exists()) dir.mkdirs();
+                        return basePathname;
+                      },
+                () -> policy);
+        
+        return stream;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/JsonTuples.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/JsonTuples.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/JsonTuples.java
new file mode 100644
index 0000000..0c231c8
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/JsonTuples.java
@@ -0,0 +1,196 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps;
+
+import static org.apache.edgent.analytics.math3.stat.Statistic.MAX;
+import static org.apache.edgent.analytics.math3.stat.Statistic.MEAN;
+import static org.apache.edgent.analytics.math3.stat.Statistic.MIN;
+import static org.apache.edgent.analytics.math3.stat.Statistic.STDDEV;
+
+import java.util.List;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.edgent.analytics.math3.json.JsonAnalytics;
+import org.apache.edgent.analytics.math3.stat.Statistic;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.topology.TStream;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * Utilties to ease working working with sensor "samples" by wrapping them
+ * in JsonObjects.
+ * <p>
+ * The Json Tuple sensor "samples" have a standard collection of properties.
+ */
+public class JsonTuples {
+        
+    /*
+     * Common attributes in the JsonObject
+     */
+    public static final String KEY_ID = "id";
+    public static final String KEY_TS = "msec";
+    public static final String KEY_READING = "reading";
+    public static final String KEY_AGG_BEGIN_TS = "agg.begin.msec";
+    public static final String KEY_AGG_COUNT = "agg.count";
+    
+    /**
+     * Create a JsonObject wrapping a raw {@code Pair<Long msec,T reading>>} sample.
+     * @param <T> Tuple type
+     * @param sample the raw sample
+     * @param id the sensor's Id
+     * @return the wrapped sample
+     */
+    public static <T> JsonObject wrap(Pair<Long,T> sample, String id) {
+        JsonObject jo = new JsonObject();
+        jo.addProperty(KEY_ID, id);
+        jo.addProperty(KEY_TS, sample.getFirst());
+        T value = sample.getSecond();
+        if (value instanceof Number)
+            jo.addProperty(KEY_READING, (Number)sample.getSecond());
+        else if (value instanceof String)
+            jo.addProperty(KEY_READING, (String)sample.getSecond());
+        else if (value instanceof Boolean)
+            jo.addProperty(KEY_READING, (Boolean)sample.getSecond());
+//        else if (value instanceof array) {
+//            // TODO cvt to JsonArray
+//        }
+//        else if (value instanceof Object) {
+//            // TODO cvt to JsonObject
+//        }
+        else {
+            Class<?> clazz = value != null ? value.getClass() : Object.class;
+            throw new IllegalArgumentException("Unhandled value type: "+ clazz);
+        }
+        return jo;
+    }
+    
+    /**
+     * Create a stream of JsonObject wrapping a stream of 
+     * raw {@code Pair<Long msec,T reading>>} samples.
+     *
+     * @param <T> Tuple type
+     * @param stream the raw input stream
+     * @param id the sensor's Id
+     * @return the wrapped stream
+     */
+    public static <T> TStream<JsonObject> wrap(TStream<Pair<Long,T>> stream, String id) {
+        return stream.map(pair -> wrap(pair, id));
+    }
+    
+    /**
+     * The partition key function for wrapped sensor samples.
+     * <p>
+     * The {@code KEY_ID} property is returned for the key.
+     * @return the function
+     */
+    public static Function<JsonObject,String> keyFn() {
+        return sample -> sample.get(KEY_ID).getAsString();
+    }
+    
+    
+    /**
+     * Get a statistic value from a sample.
+     * <p>
+     * Same as {@code getStatistic(jo, JsonTuples.KEY_READING, stat)}.
+     * 
+     * @param jo the sample
+     * @param stat the Statistic of interest
+     * @return the JsonElement for the Statistic
+     * @throws RuntimeException of the stat isn't present
+     */
+    public static JsonElement getStatistic(JsonObject jo, Statistic stat) {
+        return getStatistic(jo, JsonTuples.KEY_READING, stat);
+    }
+    
+    /**
+     * Get a statistic value from a sample.
+     * <p>
+     * Convenience for working with samples containing a property
+     * whose value is one or more {@link Statistic}
+     * as created by 
+     * {@link JsonAnalytics#aggregate(org.apache.edgent.topology.TWindow, String, String, org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate...) JsonAnalytics.aggregate()}
+     * 
+     * @param jo the sample
+     * @param valueKey the name of the property containing the JsonObject of Statistics
+     * @param stat the Statistic of interest
+     * @return the JsonElement for the Statistic
+     * @throws RuntimeException of the stat isn't present
+     */
+    public static JsonElement getStatistic(JsonObject jo, String valueKey, Statistic stat) {
+        JsonObject statsjo = jo.get(valueKey).getAsJsonObject();
+        return statsjo.get(stat.name());
+    }
+
+    /**
+     * Create a function that computes the specified statistics on the list of
+     * samples and returns a new sample containing the result.
+     * <p>
+     * The single tuple contains the specified statistics computed over
+     * all of the {@code JsonTuple.KEY_READING} 
+     * values from {@code List<JsonObject>}.
+     * <p>
+     * The resulting sample contains the properties:
+     * <ul>
+     * <li>JsonTuple.KEY_ID</li>
+     * <li>JsonTuple.KEY_MSEC - msecTimestamp of the last sample in the window</li>
+     * <li>JsonTuple.KEY_AGG_BEGIN_MSEC - msecTimestamp of the first sample in the window</li>
+     * <li>JsonTuple.KEY_AGG_COUNT - number of samples in the window ({@code value=factor})</li>
+     * <li>JsonTuple.KEY_READING - a JsonObject of the statistics
+     *                      as defined by
+     *                     {@link JsonAnalytics#aggregate(org.apache.edgent.topology.TWindow, String, String, org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate...) JsonAnalytics.aggregate()}
+     * </ul>
+     * <p>
+     * Sample use:
+     * <pre>{@code
+     * TStream<JsonObject> s = ...
+     * // reduce s by a factor of 100 with stats MEAN and STDEV 
+     * TStream<JsonObject> reduced = s.batch(100, statistics(Statistic.MEAN, Statistic.STDDEV));
+     * }</pre>
+     * 
+     * @param statistics the statistics to calculate over the window
+     * @return {@code TStream<JsonObject>} for the reduced {@code stream}
+     */
+    public static BiFunction<List<JsonObject>,String,JsonObject> statistics(Statistic... statistics) {
+        BiFunction<List<JsonObject>,JsonElement,JsonObject> statsFn = 
+                JsonAnalytics.aggregateList(KEY_ID, KEY_READING,
+                    j -> j.get(KEY_READING).getAsDouble(), 
+                    MIN, MAX, MEAN, STDDEV);
+
+        return (samples, key) -> {
+                    JsonObject jo = statsFn.apply(samples, samples.get(0).get(KEY_ID));
+                    JsonTuples.addAggStdInfo(jo, samples);
+                    return jo;
+                };
+    }
+
+    private static void addAggStdInfo(JsonObject jo, List<JsonObject> samples) {
+        // beginMsec, endMsec, nSamples
+        long msec = samples.get(0).get(KEY_TS).getAsLong();
+        long msec2 = samples.get(samples.size()-1).get(KEY_TS).getAsLong();
+        int nSamples = samples.size();
+        
+        jo.addProperty(KEY_TS, msec2);
+        jo.addProperty(KEY_AGG_BEGIN_TS, msec);
+        jo.addProperty(KEY_AGG_COUNT, nSamples);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/TopologyProviderFactory.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/TopologyProviderFactory.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/TopologyProviderFactory.java
new file mode 100644
index 0000000..1128fcb
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/TopologyProviderFactory.java
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps;
+
+import java.util.Properties;
+
+import org.apache.edgent.providers.direct.DirectProvider;
+
+/**
+ * A configuration driven factory for an Edgent topology provider.
+ */
+public class TopologyProviderFactory {
+    private final Properties props;
+    
+    /**
+     * Construct a factory
+     * @param props configuration information.
+     */
+    public TopologyProviderFactory(Properties props) {
+        this.props = props;
+    }
+    
+    /**
+     * Get a new topology provider.
+     * <p>
+     * The default provider is {@code org.apache.edgent.providers.direct.DirectProvider}.
+     * <p>
+     * The {@code topology.provider} configuration property can specify
+     * an alternative.
+     * 
+     * @return the provider
+     * @throws Exception if the provider couldn't be created
+     */
+    public DirectProvider newProvider() throws Exception {
+        String name = props.getProperty("topology.provider", "org.apache.edgent.providers.direct.DirectProvider");
+        Class<?> clazz = null;
+        try {
+            clazz = Class.forName(name);
+        }
+        catch (ClassNotFoundException e) {
+            String msg = "Class not found: "+e.getLocalizedMessage();
+            System.err.println(msg);
+            throw new IllegalStateException(msg);
+        }
+        return (DirectProvider) clazz.newInstance();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/applicationTemplate.properties
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/applicationTemplate.properties b/samples/apps/src/main/java/org/apache/edgent/samples/apps/applicationTemplate.properties
new file mode 100644
index 0000000..c45d3db
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/applicationTemplate.properties
@@ -0,0 +1,81 @@
+# A template file for Application Configuration properties
+#
+# The default Edgent topology provider is DirectProvider
+#topology.provider=org.apache.edgent.providers.development.DevelopmentProvider
+#
+application.name=MyAnalytics
+#
+
+# =========================================================================
+# Application stream logging configuration
+# Where the app puts its stream logs.  
+# The directory will be created when the topology
+# runs if it doesn't already exist.
+application.log.dir=/tmp/MyAnalytics/logs
+
+# =========================================================================
+# Application "ranges" - e.g., for threshold detections
+# Specify values generated by Range.toString():
+#  <lowerBoundType><lowerBound>..<upperBound><upperBoundType>
+#  where
+#      lowerBoundType is "[" inclusive or "(" exclusive
+#      upperBoundType is "]" inclusive or ")" exclusive
+#      lowerBound or upperBound is "*" for open ranges,
+#         e.g., [*..50]  for "atMost" 50
+#
+sensor1.range.outside1hzMeanRange=[124..129]
+
+# =========================================================================
+# MQTT Device and Connector configuration info.
+#
+# MQTT Device -- See org.apache.edgent.connectors.mqtt.device.MqttDevice for all
+# of the properties.
+#
+# An optional topic prefix.  It can be used to isolate users or applications
+# in shared MQTT broker configurations.  By default it is incorporated
+# into device topics and the MQTT clientId.
+# If you use a public MQTT broker you may want to change the topic
+# prefix so it is still unique for you but doesn't include the
+# user name or application name.
+mqttDevice.topic.prefix=ibm.xyzzy-streams.samples/user/{user.name}/{application.name}/
+#
+# The device id used for identifying the device's events and commands
+# in the MQTT topic namespace.
+# By default it also gets incorporated into the MQTT clientId value.
+mqttDevice.id=012345
+#
+# The MQTT clientId.  Only one instance of a MqttDevice can connect
+# to the MQTT broker with a given clientId.
+#mqttDevice.mqtt.clientId={mqttDevice.topic.prefix}id/{mqttDevice.id}
+#
+# MQTT Connector  See org.apache.edgent.connectors.mqtt.MqttConfig.fromProperties()
+#
+# The default configuration is for a local MQTT broker.
+# See mosquitto.org for instructions on downloading a MQTT broker.
+# Or use some other MQTT broker available in your environment.
+mqtt.serverURLs=tcp://localhost:1883
+#
+# Alternatively, there are some public MQTT brokers available to experiment with.
+# Their availability status isn't guaranteed.  If you're unable to connect
+# to the broker, it's likely that it isn't up or your firewalls don't
+# allow you to connect.  DO NOT PUBLISH ANYTHING SENSITIVE - anyone
+# can be listing.
+#mqtt.serverURLs=tcp://iot.eclipse.org:1883
+#mqtt.serverURLs=tcp://test.mosquitto.org:1883
+#
+#mqtt.userName=xyzzy
+#mqtt.password=myMosquittoPw
+
+# =========================================================================
+# Patterns for identifying which streams to trace to System.out
+# To enable use include.csv and/or includes.regex.
+# To exclude an otherwise included file, use excludes.csv and/or excludes.regex
+#
+# Some tracing labels
+# sensor1.raw1khz,sensor1.j1khz,sensor1.j1hzStats,sensor1.outside1hzMeanRange*,
+# sensor1.periodicLastN*
+#
+#stream.tracing.includes.csv=sensor1.j1hzStats
+stream.tracing.includes.regex=sensor1.outside1hzMeanRange.*
+#stream.tracing.excludes.regex=.*
+#stream.tracing.excludes.csv=sensor1.raw1khz

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/AbstractMqttApplication.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/AbstractMqttApplication.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/AbstractMqttApplication.java
new file mode 100644
index 0000000..6bd943a
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/AbstractMqttApplication.java
@@ -0,0 +1,121 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps.mqtt;
+
+import static org.apache.edgent.connectors.iot.IotDevice.CMD_PAYLOAD;
+
+import java.util.Arrays;
+
+import org.apache.edgent.connectors.mqtt.iot.MqttDevice;
+import org.apache.edgent.samples.apps.AbstractApplication;
+import org.apache.edgent.samples.apps.ApplicationUtilities;
+import org.apache.edgent.samples.apps.TopologyProviderFactory;
+import org.apache.edgent.topology.Topology;
+
+import com.google.gson.JsonObject;
+
+/**
+ * An MQTT Application base class.
+ * <p>
+ * Application instances need to:
+ * <ul>
+ * <li>define an implementation for {@link #buildTopology(Topology)}</li>
+ * <li>call {@link #run()} to build and submit the topology for execution.</li>
+ * </ul>
+ * <p>
+ * The class provides some common processing needs:
+ * <ul>
+ * <li>Support for an external configuration file</li>
+ * <li>Provides a {@link TopologyProviderFactory}</li>
+ * <li>Provides a {@link ApplicationUtilities}</li>
+ * <li>Provides a {@link MqttDevice}</li>
+ * </ul>
+ */
+public abstract class AbstractMqttApplication extends AbstractApplication {
+    
+    private MqttDevice mqttDevice;
+    
+    public AbstractMqttApplication(String propsPath) throws Exception {
+        super(propsPath);
+    }
+    
+    @Override
+    protected void preBuildTopology(Topology t) {
+        // Add an MQTT device communication manager to the topology
+        updateTopicPrefix();
+        mqttDevice = new MqttDevice(t, props);
+        System.out.println("MqttDevice serverURLs " + Arrays.toString(mqttDevice.getMqttConfig().getServerURLs()));
+        System.out.println("MqttDevice clientId " + mqttDevice.getMqttConfig().getClientId());
+        System.out.println("MqttDevice deviceId " + props.getProperty("mqttDevice.id"));
+        System.out.println("MqttDevice event topic pattern " + mqttDevice.eventTopic(null));
+        System.out.println("MqttDevice command topic pattern " + mqttDevice.commandTopic(null));
+    }
+    
+    /**
+     * Get the application's MqttDevice
+     * @return the MqttDevice
+     */
+    public MqttDevice mqttDevice() {
+        return mqttDevice;
+    }
+    
+    private void updateTopicPrefix() {
+        String val = props.getProperty("mqttDevice.topic.prefix");
+        if (val != null) {
+            val = val.replace("{user.name}", System.getProperty("user.name"));
+            val = val.replace("{application.name}", props.getProperty("application.name"));
+            props.setProperty("mqttDevice.topic.prefix", val);
+        }
+    }
+    
+    /**
+     * Compose a MqttDevice eventId for the sensor.
+     * @param sensorId the sensor id
+     * @param eventId the sensor's eventId
+     * @return the device eventId
+     */
+    public String sensorEventId(String sensorId, String eventId) {
+        return sensorId + "." + eventId;
+    }
+    
+    /**
+     * Compose a MqttDevice commandId for the sensor
+     * @param sensorId the sensor id
+     * @param commandId the sensor's commandId
+     * @return the device commandId
+     */
+    public String commandId(String sensorId, String commandId) {
+        return sensorId + "." + commandId;
+    }
+    
+    /**
+     * Extract a simple string valued command arg 
+     * from a {@link MqttDevice#commands(String...)} returned
+     * JsonObject tuple.
+     * <p>
+     * Interpret the JsonObject's embedded payload as a JsonObject with a single
+     * "value" property.
+     * @param jo the command tuple.
+     * @return the command's argument value 
+     */
+    public String getCommandValueString(JsonObject jo) {
+        return jo.get(CMD_PAYLOAD).getAsJsonObject().get("value").getAsString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/DeviceCommsApp.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/DeviceCommsApp.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/DeviceCommsApp.java
new file mode 100644
index 0000000..70deaa4
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/DeviceCommsApp.java
@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps.mqtt;
+
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.connectors.mqtt.MqttStreams;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.json.JsonFunctions;
+
+import com.google.gson.JsonObject;
+
+/**
+ * An MQTT Device Communications client for watching device events
+ * and sending commands.
+ * <p>
+ * This is an "application properties" aware client that gets MQTT configuration
+ * from an Edgent sample app application configuration properties file.
+ * <p>
+ * This client avoids the need for other MQTT clients (e.g., from a mosquitto
+ * installation) to observe and control the applications.
+ */
+public class DeviceCommsApp extends AbstractMqttApplication {
+    
+    private static final String usage = "Usage: watch | send <cmdLabel> <cmdArg>";
+
+    private String mode;
+    private String cmdLabel;
+    private String cmdArg;
+    
+    public static void main(String[] args) throws Exception {
+        if (args.length < 1)
+            throw new Exception("missing pathname to application properties file");
+        
+        try {
+            int i = 0;
+            DeviceCommsApp application = new DeviceCommsApp(args[i++]);
+            String mode = args[i++];
+            if (!("watch".equals(mode) || "send".equals(mode))) {
+                throw new IllegalArgumentException("Unsupport mode: "+application.mode);
+            }
+            application.mode = mode;
+            if (application.mode.equals("send")) {
+                application.cmdLabel = args[i++];
+                application.cmdArg = args[i++];
+            }
+        
+            application.run();
+        }
+        catch (IllegalArgumentException | IndexOutOfBoundsException e) {
+            throw new IllegalArgumentException(e.getMessage()
+                    +"\n"+usage);
+        }
+    }
+    
+    /**
+     * Create an application instance.
+     * @param propsPath pathname to an application configuration file
+     * @throws Exception
+     */
+    DeviceCommsApp(String propsPath) throws Exception {
+        super(propsPath);
+    }
+    
+    @Override
+    protected void buildTopology(Topology t) {
+        mqttDevice().getMqttConfig().setClientId(null);
+        MqttStreams mqtt = new MqttStreams(t, () -> mqttDevice().getMqttConfig());
+        if (mode.equals("send")) {
+            String topic = mqttDevice().commandTopic(cmdLabel);
+            JsonObject jo = new JsonObject();
+            jo.addProperty("value", cmdArg);
+            System.out.println("Publishing command: topic="+topic+"  value="+jo);
+            TStream<String> cmd = t.strings(JsonFunctions.asString().apply(jo));
+            mqtt.publish(cmd, topic, QoS.FIRE_AND_FORGET, false/*retain*/);
+            // Hmm... the paho MQTT *non-daemon* threads prevent the app
+            // from exiting after returning from main() following job submit().
+            // Lacking MqttStreams.shutdown() or such...
+            // Delay a bit and then explicitly exit().  Ugh.
+            cmd.sink(tuple -> { 
+                try {
+                    Thread.sleep(3*1000);
+                } catch (Exception e) { }
+                System.exit(0); });
+        }
+        else if (mode.equals("watch")) {
+            String topicFilter = mqttDevice().eventTopic(null);
+            System.out.println("Watching topic filter "+topicFilter);
+            TStream<String> events = mqtt.subscribe(topicFilter, QoS.FIRE_AND_FORGET,
+                    (topic,payload) -> { 
+                        String s = "\n# topic "+topic;
+                        s += "\n" + new String(payload);
+                        return s;
+                    });
+            events.print();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/package-info.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/package-info.java
new file mode 100644
index 0000000..54598cb
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/mqtt/package-info.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Base support for Edgent MQTT based application samples.
+ * <p>
+ * This package builds on {@code org.apache.edgent.samples.apps} providing
+ * additional common capabilities in the area of MQTT based device appliations.
+ */
+package org.apache.edgent.samples.apps.mqtt;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/package-info.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/package-info.java
new file mode 100644
index 0000000..cf62ba2
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/package-info.java
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Support for some more complex Edgent application samples.
+ * <p>
+ * This package provides some commonly needed capabilities particularly in
+ * the area of an external configuration description influence on various
+ * things.
+ * <p>
+ * Focal areas:
+ * <ul>
+ * <li>{@link org.apache.edgent.samples.apps.AbstractApplication} - a base class for
+ *     Edgent applications providing commonly needed features.
+ *     </li>
+ * <li>{@link org.apache.edgent.samples.apps.TopologyProviderFactory} - a configuration
+ *     driven factory for an Edgent topology provider.
+ *     </li>
+ * <li>{@link org.apache.edgent.samples.apps.ApplicationUtilities} - some
+ *     general configuration driven utilities. 
+ *     </li>
+ * <li>{@link org.apache.edgent.samples.apps.JsonTuples} - utilities for wrapping
+ *     sensor samples in a JsonObject and operating on it.
+ *     </li>
+ * </ul>
+ */
+package org.apache.edgent.samples.apps;
\ No newline at end of file