You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:07 UTC

[06/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/Sensor1.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/Sensor1.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/Sensor1.java
new file mode 100644
index 0000000..bdad9f4
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/Sensor1.java
@@ -0,0 +1,286 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps.sensorAnalytics;
+
+import static org.apache.edgent.analytics.math3.stat.Statistic.MAX;
+import static org.apache.edgent.analytics.math3.stat.Statistic.MEAN;
+import static org.apache.edgent.analytics.math3.stat.Statistic.MIN;
+import static org.apache.edgent.analytics.math3.stat.Statistic.STDDEV;
+import static org.apache.edgent.samples.apps.JsonTuples.KEY_ID;
+import static org.apache.edgent.samples.apps.JsonTuples.KEY_READING;
+import static org.apache.edgent.samples.apps.JsonTuples.KEY_TS;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.edgent.analytics.sensors.Range;
+import org.apache.edgent.analytics.sensors.Ranges;
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.samples.apps.JsonTuples;
+import org.apache.edgent.samples.utils.sensor.PeriodicRandomSensor;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+/**
+ * Analytics for "Sensor1".
+ * <p>
+ * This sample demonstrates some common continuous sensor analytic themes.
+ * <p>
+ * In this case we have a simulated sensor producing 1000 samples per second
+ * of an integer type in the range of 0-255.
+ * <p>
+ * The processing pipeline created is roughly:
+ * <ul>
+ * <li>Batched Data Reduction - reduce the sensor's 1000 samples per second
+ *     down to 1 sample per second simple statistical aggregation of the readings.
+ *     </li>
+ * <li>Compute historical information - each 1hz sample is augmented
+ *     with a 30 second trailing average of the 1hz readings.
+ *     </li>
+ * <li>Threshold detection - each 1hz sample's value is compared
+ *     against a target range and outliers are identified.
+ *     </li>
+ * <li>Local logging - outliers are logged to a local file
+ *     </li>
+ * <li>Publishing results to a MQTT broker:
+ *     <ul>
+ *     <li>when enabled, invdividual outliers are published.</li>
+ *     <li>Every 30 seconds a list of the last 10 outliers is published.</li>
+ *     </ul>
+ *     </li>
+ * </ul>
+ * <p>
+ * The sample also demonstrates:
+ * <ul>
+ * <li>Dynamic configuration control - subscribe to a MQTT broker
+ *     to receive commands to adjust the threshold detection range value. 
+ *     </li>
+ * <li>Generally, the configuration of the processing is driven via an
+ *     external configuration description.
+ *     </li>
+ * <li>Conditional stream tracing - configuration controlled inclusion of tracing.
+ *     </li>
+ * <li>Use of {@link TStream#tag(String...)} to improve information provided by
+ *     the Edgent DevelopmentProvider console.</li>
+ * </ul>
+ */
+public class Sensor1 {
+    private final SensorAnalyticsApplication app;
+    private final Topology t;
+    private final String sensorId = "sensor1";
+
+    public Sensor1(Topology t, SensorAnalyticsApplication app) {
+        this.t = t;
+        this.app = app;
+    }
+    
+    /**
+     * Add the sensor's analytics to the topology.
+     */
+    public void addAnalytics() {
+
+        // Need synchronization for set/get of dynamically changeable values.
+        AtomicReference<Range<Integer>> range = new AtomicReference<>();
+        AtomicReference<Boolean> isPublish1hzOutsideRange = new AtomicReference<>();
+        
+        // Initialize the controls
+        range.set(app.utils().getRangeInteger(sensorId, "outside1hzMeanRange"));
+        isPublish1hzOutsideRange.set(false);
+        
+        // Handle the sensor's device commands
+        app.mqttDevice().commands(commandId("set1hzMeanRangeThreshold"))
+            .tag(commandId("set1hzMeanRangeThresholdCmd"))
+            .sink(jo -> {
+                    Range<Integer> newRange = Ranges.valueOfInteger(getCommandValue(jo));
+                    System.out.println("===== Changing range to "+newRange+" ======");
+                    range.set(newRange);
+                });
+        app.mqttDevice().commands(commandId("setPublish1hzOutsideRange"))
+            .tag(commandId("setPublish1hzOutsideRangeCmd"))
+            .sink(jo -> {
+                    Boolean b = new Boolean(getCommandValue(jo));
+                    System.out.println("===== Changing isPublish1hzOutsideRange to "+b+" ======");
+                    isPublish1hzOutsideRange.set(b);
+                });
+        
+        // Create a raw simulated sensor stream of 1000 tuples/sec.
+        // Each tuple is Pair<Long timestampMsec, sensor-reading (0..255)>.
+        PeriodicRandomSensor simulatedSensorFactory = new PeriodicRandomSensor();
+        TStream<Pair<Long,Integer>> raw1khz = 
+                simulatedSensorFactory.newInteger(t, 1/*periodMsec*/, 255)
+                .tag("raw1khz");
+        traceStream(raw1khz, "raw1khz");
+        
+        // Wrap the raw sensor reading in a JsonObject for convenience.
+        TStream<JsonObject> j1khz = JsonTuples.wrap(raw1khz, sensorId)
+                .tag("j1khz");
+        traceStream(j1khz, "j1khz");
+        
+        // Data-reduction: reduce 1khz samples down to
+        // 1hz aggregate statistics samples.
+        TStream<JsonObject> j1hzStats = j1khz.last(1000, JsonTuples.keyFn())
+                .batch(JsonTuples.statistics(MIN, MAX, MEAN, STDDEV))
+                .tag("1hzStats");
+        
+        // Create a 30 second sliding window of average trailing Mean values
+        // and enrich samples with that information.
+        j1hzStats = j1hzStats.last(30, JsonTuples.keyFn()).aggregate(
+            (samples, key) -> {
+                // enrich and return the most recently added tuple
+                JsonObject jo = samples.get(samples.size()-1);
+                double meanSum = 0;
+                for (JsonObject js : samples) {
+                    meanSum += JsonTuples.getStatistic(js, MEAN).getAsDouble();
+                }
+                jo.addProperty("AvgTrailingMean", Math.round(meanSum / samples.size()));
+                jo.addProperty("AvgTrailingMeanCnt", samples.size());
+                return jo;
+            })
+            .tag("1hzStats.enriched");
+        traceStream(j1hzStats, "j1hzStats");
+
+        // Detect 1hz samples whose MEAN value are
+        // outside the configuration specified range.
+        TStream<JsonObject> outside1hzMeanRange = j1hzStats.filter(
+                sample -> {
+                    int value = JsonTuples.getStatistic(sample, MEAN).getAsInt();
+                    return !range.get().contains(value);
+                })
+                .tag("outside1hzMeanRange");
+        traceStream(outside1hzMeanRange, () -> "outside1hzMeanRange"+range.get()); 
+        
+        // Log every outside1hzMeanRange event
+        app.utils().logStream(outside1hzMeanRange, "ALERT", "outside1hzMeanRange");
+        
+        // Conditionally publish every outside1hzMeanRange event.
+        // Use a pressureReliever to prevent backpressure if the broker
+        // can't be contacted.
+        // TODO enhance MqttDevice with configurable reliever. 
+        app.mqttDevice().events(
+                PlumbingStreams.pressureReliever(
+                    outside1hzMeanRange.filter(tuple -> isPublish1hzOutsideRange.get())
+                                       .tag("outside1hzMeanRangeEvent.conditional"),
+                    tuple -> 0, 30).tag("outside1hzMeanRangeEvent.pressureRelieved"),
+                app.sensorEventId(sensorId, "outside1hzMeanRangeEvent"), QoS.FIRE_AND_FORGET);
+        
+        // Demonstrate periodic publishing of a sliding window if
+        // something changed since it was last published.
+        periodicallyPublishLastNInfo(outside1hzMeanRange, 10, 30,
+                "periodicLastOutsideRangeEvent");
+        
+        // TODO histogram: #alerts over the last 8hr
+
+    }
+    
+    /**
+     * Periodically publish the lastN on a stream.
+     * @param stream tuples to 
+     * @param count sliding window size "lastN"
+     * @param nSec publish frequency
+     * @param event sensor's publish event label
+     */
+    private void periodicallyPublishLastNInfo(TStream<JsonObject> stream, 
+            int count, int nSec, String event) {
+
+        // Demonstrate periodic publishing of a sliding window if
+        // something changed since it was last published.
+
+        // Maintain a sliding window of the last N tuples.
+        // TODO today, windows don't provide "anytime" access to their collection
+        // so maintain our own current copy of the collection that we can
+        // access it when needed.
+        // 
+        List<JsonObject> lastN = Collections.synchronizedList(new ArrayList<>());
+        stream.last(count, JsonTuples.keyFn())
+            .aggregate((samples, key) -> samples)
+            .tag(event+".lastN")
+            .sink(samples -> {
+                    // Capture the new list/window.  
+                    synchronized(lastN) {
+                        lastN.clear();
+                        lastN.addAll(samples);
+                    }
+                });
+    
+        // Publish the lastN (with trimmed down info) every nSec seconds
+        // if anything changed since the last publish.
+        TStream<JsonObject> periodicLastN = 
+                t.poll(() -> 1, nSec, TimeUnit.SECONDS).tag(event+".trigger")
+                .filter(trigger -> !lastN.isEmpty()).tag(event+".changed")
+                .map(trigger -> {
+                    synchronized(lastN) {
+                        // create a single JsonObject with the list
+                        // of reduced-content samples
+                        JsonObject jo = new JsonObject();
+                        jo.addProperty(KEY_ID, sensorId);
+                        jo.addProperty(KEY_TS, System.currentTimeMillis());
+                        jo.addProperty("window", count);
+                        jo.addProperty("pubFreqSec", nSec);
+                        JsonArray ja = new JsonArray();
+                        jo.add("lastN", ja);
+                        for (JsonObject j : lastN) {
+                            JsonObject jo2 = new JsonObject();
+                            ja.add(jo2);
+                            jo2.add(KEY_TS, j.get(KEY_TS));
+                            // reduce size: include only 2 significant digits
+                            jo2.addProperty(KEY_READING, String.format("%.2f", 
+                                JsonTuples.getStatistic(j, MEAN).getAsDouble()));
+                        }
+                        lastN.clear();
+                        return jo;
+                    }
+                })
+                .tag(event);
+
+        traceStream(periodicLastN, event);
+
+        // Use a pressureReliever to prevent backpressure if the broker
+        // can't be contacted.
+        // TODO enhance MqttDevice with configurable reliever. 
+        app.mqttDevice().events(
+                PlumbingStreams.pressureReliever(periodicLastN, tuple -> 0, 30)
+                    .tag(event+".pressureRelieved"),
+                app.sensorEventId(sensorId, event), QoS.FIRE_AND_FORGET);
+    }
+    
+    private String commandId(String commandId) {
+        return app.commandId(sensorId, commandId);
+    }
+    
+    private String getCommandValue(JsonObject jo) {
+        return app.getCommandValueString(jo);
+    }
+    
+    private <T> TStream<T> traceStream(TStream<T> stream, String label) {
+        return traceStream(stream, () -> label); 
+    }
+    
+    private <T> TStream<T> traceStream(TStream<T> stream, Supplier<String> label) {
+        return app.utils().traceStream(stream, sensorId, label); 
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java
new file mode 100644
index 0000000..061bf95
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/SensorAnalyticsApplication.java
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.apps.sensorAnalytics;
+
+import org.apache.edgent.samples.apps.mqtt.AbstractMqttApplication;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * A sample application demonstrating some common sensor analytic processing
+ * themes.
+ */
+public class SensorAnalyticsApplication extends AbstractMqttApplication {
+    
+    public static void main(String[] args) throws Exception {
+        if (args.length != 1)
+            throw new Exception("missing pathname to application properties file");
+        
+        SensorAnalyticsApplication application = new SensorAnalyticsApplication(args[0]);
+        
+        application.run();
+    }
+    
+    /**
+     * Create an application instance.
+     * @param propsPath pathname to an application configuration file
+     * @throws Exception
+     */
+    SensorAnalyticsApplication(String propsPath) throws Exception {
+        super(propsPath);
+    }
+    
+    @Override
+    protected void buildTopology(Topology t) {
+        
+        // Add the "sensor1" analytics to the topology
+        new Sensor1(t, this).addAnalytics();
+        
+        // TODO Add the "sensor2" analytics to the topology
+        // TODO Add the "sensor3" analytics to the topology
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/package-info.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/package-info.java b/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/package-info.java
new file mode 100644
index 0000000..6ce82b6
--- /dev/null
+++ b/samples/apps/src/main/java/org/apache/edgent/samples/apps/sensorAnalytics/package-info.java
@@ -0,0 +1,164 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * The Sensor Analytics sample application demonstrates some common 
+ * continuous sensor analytic application themes.
+ * See {@link org.apache.edgent.samples.apps.sensorAnalytics.Sensor1 Sensor1} for the
+ * core of the analytics processing and  
+ * {@link org.apache.edgent.samples.apps.sensorAnalytics.SensorAnalyticsApplication
+ * SensorAnalyticsApplication}
+ * for the main program.
+ * <p>
+ * The themes include:
+ * <ul>
+ * <li>Batched Data Reduction - reducing higher frequency sensor reading
+ *     samples down to a lower frequency using statistical aggregations
+ *     of the raw readings.
+ *     </li>
+ * <li>Computing continuous historical statistics such as a
+ *     30 second trailing average of sensor readings.
+ *     </li>
+ * <li>Outlier / threshold detection against a configurable range</li>
+ * <li>Local logging of stream tuples</li>
+ * <li>Publishing analytic results to an MQTT broker</li>
+ * <li>Dynamic configuration control - subscribing to a MQTT broker
+ *     to receive commands to adjust the threshold detection range value. 
+ *     </li>
+ * <li>Generally, the configuration of the processing is driven via an
+ *     external configuration description.
+ *     </li>
+ * <li>Conditional stream tracing - configuration controlled inclusion of tracing.
+ *     </li>
+ * </ul>
+ * 
+ * <h2>Prerequisites:</h2>
+ * <p>
+ * The default configuration is for a local MQTT broker.
+ * A good resource is <a href="http://mosquitto.org">mosquitto.org</a>
+ * if you want to download and setup your own MQTT broker.
+ * Or you can use some other broker available in your environment.
+ * <p>
+ * Alternatively, there are some public MQTT brokers available to experiment with.
+ * Their availability status isn't guaranteed.  If you're unable to connect
+ * to the broker, it's likely that it isn't up or your firewalls don't
+ * allow you to connect.  DO NOT PUBLISH ANYTHING SENSITIVE - anyone
+ * can be listing.  A couple of public broker locations are noted
+ * in the application's properties file.
+ * <p>
+ * The default {@code mqttDevice.topic.prefix} value, used by default in 
+ * generated MQTT topic values and MQTT clientId, contains the user's
+ * local login id.  The SensorAnalytics sample application does not have any
+ * other sensitive information.
+ * <p>
+ * Edit {@code <edgent-release>/java8/scripts/apps/sensorAnalytics/sensoranalytics.properties}
+ * to change the broker location or topic prefix.
+ * 
+ * <h2>Application output:</h2>
+ * <p>
+ * The application periodically (every 30sec), publishes a list of
+ * the last 10 outliers to MQTT.  When enabled, it also publishes 
+ * full details of individual outliers as they occur.
+ * It also subscribes to MQTT topics for commands to dynamically change the
+ * threshold range and whether to publish individual outliers.
+ * <p>
+ * All MQTT configuration information, including topic patterns,
+ * are in the application.properties file.
+ * <p>
+ * The application logs outlier events in local files.  The actual location
+ * is specified in the application.properties file.
+ * <p>
+ * The application generates some output on stdout and stderr.
+ * The information includes:
+ * <ul>
+ * <li>MQTT device info. Lines 1 through 5 in the sample console output below.</li>
+ * <li>URL for the Edgent development console.  Line 6.</li>
+ * <li>Trace of the outlier event stream. Line 7.
+ *     The output is a label, which includes the active threshold range,
+ *     followed by the event's JSON.
+ *     These are the events that will also be logged and conditionally published
+ *     as well as included in the periodic lastN info published every 30sec.
+ *     </li>
+ * <li>Announcement when a "change threshold" or "enable publish of 1khz outliers"
+ *     command is received and processed.
+ *     Line 8 and 9. 
+ *     </li>
+ * <li>At this time some INFO trace output from the MQTT connector</li>
+ * <li>At this time some INFO trace output from the File connector</li>
+ * </ul>
+ * Sample console output:
+ * <pre>{@code
+ * [1] MqttDevice serverURLs [tcp://localhost:1883]
+ * [2] MqttDevice clientId id/012345
+ * [3] MqttDevice deviceId 012345
+ * [4] MqttDevice event topic pattern id/012345/evt/+/fmt/json
+ * [5] MqttDevice command topic pattern id/012345/cmd/+/fmt/json
+ * [6] Edgent Console URL for the job: http://localhost:57324/console
+ * [7] sensor1.outside1hzMeanRange[124..129]: {"id":"sensor1","reading":{"N":1000,"MIN":0.0,"MAX":254.0,"MEAN":130.23200000000006,"STDDEV":75.5535473324351},"msec":1454623874408,"agg.begin.msec":1454623873410,"agg.count":1000,"AvgTrailingMean":128,"AvgTrailingMeanCnt":4}
+ * ...
+ * [8] ===== Changing range to [125..127] ======
+ * sensor1.outside1hzMeanRange[125..127]: {"id":"sensor1","reading":{"N":1000,"MIN":0.0,"MAX":254.0,"MEAN":129.00099999999978,"STDDEV":74.3076080870567},"msec":1454624142419,"agg.begin.msec":1454624141420,"agg.count":1000,"AvgTrailingMean":127,"AvgTrailingMeanCnt":30}
+ * [9] ===== Changing isPublish1hzOutsideRange to true ======
+ * ...
+ * }</pre>
+ * 
+ * <h2>Running, observing and controlling the application:</h2>
+ * <pre>{@code
+ * $ ./runSensorAnalytics.sh
+ * }</pre>
+ * <p>
+ * To observe the locally logged outlier events:
+ * <pre>{@code
+ * $ tail -f /tmp/SensorAnalytics/logs/.outside1hzMeanRange
+ * }</pre>
+ * <p>
+ * To observe the events that are getting published to MQTT:
+ * <pre>{@code
+ * $ ./runDeviceComms.sh watch
+ * }</pre>
+ * <p>
+ * To change the outlier threshold setting:
+ * <br>The command value is the new range string: {@code [<lowerBound>..<upperBound>]}.
+ * <pre>{@code
+ * $ ./runDeviceComms.sh send sensor1.set1hzMeanRangeThreshold "[125..127]"
+ * }</pre>
+ * <p>
+ * To change the "publish individual 1hz outliers" control:
+ * <pre>{@code
+ * $ ./runDeviceComms.sh send sensor1.setPublish1hzOutsideRange true
+ * }</pre>
+ * 
+ * <h3>Alternative MQTT clients</h3>
+ * You can use any MQTT client but you will have to specify the 
+ * MQTT server, the event topics to watch / subscribe to, and the command topics
+ * and JSON for publish commands.  The MqttDevice output above provides most
+ * of the necessary information.
+ * <p>
+ * For example, the {@code mosquitto_pub} and
+ * {@code mosquitto_sub} commands equivalent to the above runDeviceComms.sh
+ * commands are:
+ * <pre>{@code
+ * # Watch the device's event topics
+ * $ /usr/local/bin/mosquitto_sub -t id/012345/evt/+/fmt/json
+ * # change the outlier threshold setting
+ * $ /usr/local/bin/mosquitto_pub -m '{"value":"[125..127]"}' -t id/012345/cmd/sensor1.set1hzMeanRangeThreshold/fmt/json
+ * # change the "publish individual 1hz outliers" control
+ * $ /usr/local/bin/mosquitto_pub -m '{"value":"true"}' -t id/012345/cmd/sensor1.setPublish1hzOutsideRange/fmt/json
+ * }</pre>
+ */
+package org.apache.edgent.samples.apps.sensorAnalytics;

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/MsgSupplier.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/MsgSupplier.java b/samples/connectors/src/main/java/edgent/samples/connectors/MsgSupplier.java
deleted file mode 100644
index 10c8f89..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/MsgSupplier.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors;
-
-import edgent.function.Supplier;
-
-/**
- * A Supplier&lt;String&gt; for creating sample messages to publish.
- */
-public class MsgSupplier implements Supplier<String> {
-    private static final long serialVersionUID = 1L;
-    private final int maxCnt;
-    private int cnt;
-    private boolean done;
-    
-    public MsgSupplier(int maxCnt) {
-        this.maxCnt = maxCnt;
-    }
-
-    @Override
-    public synchronized String get() {
-        ++cnt;
-        if (maxCnt >= 0 && cnt >= maxCnt) {
-            if (!done) {
-                done = true;
-                System.out.println("poll: no more messages to generate.");
-            }
-            return null;
-        }
-        String msg = String.format("Message-%d from %s", cnt, Util.simpleTS());
-        System.out.println("poll generated msg to publish: " + msg);
-        return msg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/Options.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/Options.java b/samples/connectors/src/main/java/edgent/samples/connectors/Options.java
deleted file mode 100644
index 0c8f2e5..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/Options.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import edgent.function.Function;
-
-/**
- * Simple command option processor.
- */
-public class Options {
-    private static final Map<String,Function<String,?>> handlers = new HashMap<>();
-    private static final Map<String,Object> options = new HashMap<>();
-    private static final Map<String,Object> defaults = new HashMap<>();
-    
-    public <T> void addHandler(String opt, Function<String,T> cvtFn) {
-        addHandler(opt, cvtFn, null);
-    }
-    
-    public <T> void addHandler(String opt, Function<String,T> cvtFn, T dflt) {
-        handlers.put(opt, cvtFn);
-        if (dflt != null)
-            defaults.put(opt, dflt);
-    }
-    
-    public void processArgs(String[] args) {
-        for (Map.Entry<String,Function<String,?>> e : handlers.entrySet()) {
-            handleOpt(e.getKey(), e.getValue(), args);
-        }
-
-        for (String arg : args) {
-            String[] item = arg.split("=");
-            if (!handlers.containsKey(item[0]))
-                throw new IllegalArgumentException("Unrecognized argument '"+arg+"'");
-        }
-    }
-    
-    private void handleOpt(String opt, Function<String,?> cvtFn, String[] args) {
-        String v = getArg(cvtFn!=null ? opt : opt+"=true", args);
-        if (v != null)
-            options.put(opt, cvtFn==null ? true : cvtFn.apply(v));
-        else if (defaults.get(opt) != null)
-            options.put(opt, defaults.get(opt));
-    }
-
-    public <T> T get(String opt) {
-        return get(opt, null);
-    }
-    
-    @SuppressWarnings("unchecked")
-    public <T> T get(String opt, T dflt) {
-        return options.get(opt) == null ? dflt : (T)options.get(opt); 
-    }
-    
-    public Set<Map.Entry<String,Object>> getAll() {
-        return Collections.unmodifiableSet(options.entrySet());
-    }
-    
-    public void put(String opt, Object value) {
-        options.put(opt, value);
-    }
-    
-    private String getArg(String item, String[] args) {
-        String[] itemParts = item.split("=");
-        if (itemParts.length>1)
-            item = itemParts[0];
-        for (String arg : args) {
-            String[] parts = arg.split("=");
-            if (item.equals(parts[0])) {
-                if (parts.length > 1)
-                    return parts[1];
-                else
-                    return itemParts.length > 1 ? itemParts[1] : parts[1];
-            }
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/Util.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/Util.java b/samples/connectors/src/main/java/edgent/samples/connectors/Util.java
deleted file mode 100644
index ed095bb..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/Util.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-
-import edgent.execution.Job;
-
-/**
- * Utilities for connector samples.
- */
-public class Util {
-
-    /**
-     * Generate a simple timestamp with the form {@code HH:mm:ss.SSS}
-     * @return the timestamp
-     */
-    public static String simpleTS() {
-        return new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
-    }
-
-    
-    /**
-     * Wait for the job to reach the specified state.
-     * <p>
-     * A placeholder till GraphJob directly supports awaitState()?
-     * @param job the job
-     * @param state the state to wait for
-     * @param timeout specify -1 to wait forever (until interrupted)
-     * @param unit may be null if timeout is -1
-     * @return true if the state was reached, false otherwise: the time limit
-     * was reached of the thread was interrupted.
-     */
-    public static boolean awaitState(Job job, Job.State state, long timeout, TimeUnit unit) {
-        long endWait = -1;
-        if (timeout != -1) {
-            endWait = System.currentTimeMillis()
-                        + unit.toMillis(timeout);
-        }
-        while (true) {
-            Job.State curState = job.getCurrentState();
-            if (curState == state)
-                return true;
-            if (endWait != -1) {
-                long now = System.currentTimeMillis();
-                if (now >= endWait)
-                    return false;
-            }
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                return false;
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Cmd.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Cmd.java b/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Cmd.java
deleted file mode 100644
index 07d205a..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Cmd.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.elm327;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import com.google.gson.JsonObject;
-
-/**
- * ELM327 and OBD-II command interface.
- *
- */
-public interface Cmd {
-    /**
-     * Key ({@value}) for PID identifier in JSON result.
-     */
-    String PID = "pid";
-
-    /**
-     * Key ({@value}) for timestamp in JSON result. Timestamp value is the
-     * number of milliseconds since the 1907 epoch.
-     */
-    String TS = "ts";
-    
-    /**
-     * Key ({@value}) for the returned value in JSON result.
-     * May not be present.
-     */
-    String VALUE = "value";
-
-    /**
-     * How the command is written to the serial port.
-     * 
-     * @param out
-     *            OutputStream to write bytes to.
-     * @throws IOException
-     *             Exception writing bytes.
-     */
-    void writeCmd(OutputStream out) throws IOException;
-
-    /**
-     * Process the reply into a result.
-     * 
-     * @param result
-     *            JSON object to populate with the result.
-     * @param reply
-     *            Bytes that were returned from the command execution.
-     *            
-     * @return {@code true} result is valid, {@code false} otherwise.
-     */
-    boolean result(JsonObject result, byte[] reply);
-
-    /**
-     * Unique identifier of the command.
-     * 
-     * @return Unique identifier of the command.
-     */
-    String id();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Elm327Cmds.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Elm327Cmds.java b/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Elm327Cmds.java
deleted file mode 100644
index fdae6c3..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Elm327Cmds.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.elm327;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.serial.SerialDevice;
-import edgent.samples.connectors.elm327.runtime.CommandExecutor;
-
-/**
- * ELM327 commands.
- * 
- * 
- */
-public enum Elm327Cmds implements Cmd {
-
-    INIT("ATZ"),
-    ECHO_OFF("ATE0"),
-    PROTOCOL_3("ATSP3"),
-    PROTOCOL_5("ATSP5"),
-    BYPASS_INIT("ATBI"),
-    FAST_INIT("ATFI"),
-    SLOW_INIT("ATSI"),;
-
-    private byte[] cmd;
-
-    Elm327Cmds(String code) {
-        cmd = (code + "\r").getBytes(StandardCharsets.US_ASCII);
-    }
-
-    @Override
-    public void writeCmd(OutputStream out) throws IOException {
-        out.write(cmd);
-    }
-
-    @Override
-    public boolean result(JsonObject result, byte[] data) {
-        return true;
-    }
-
-    @Override
-    public String id() {
-        return name();
-    }
-    
-    /**
-     * Initialize the ELM327 to a specific protocol.
-     * @param device Serial device the ELM327 is connected to.
-     * @param protocol OBD-II protocol to initialize to.
-     */
-    public static void initializeProtocol(SerialDevice device, Elm327Cmds protocol) {
-        device.setInitializer(port -> CommandExecutor.initialize(protocol, port.getOutput(), port.getInput()));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Elm327Streams.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Elm327Streams.java b/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Elm327Streams.java
deleted file mode 100644
index 18900fe..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Elm327Streams.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.elm327;
-
-import java.util.concurrent.TimeUnit;
-
-import com.google.gson.JsonArray;
-
-import edgent.connectors.serial.SerialDevice;
-import edgent.function.Supplier;
-import edgent.samples.connectors.elm327.runtime.CommandExecutor;
-import edgent.topology.TStream;
-
-/**
- * Streams fetching OBD-II data from an ELM327 through
- * a serial device.
- *
- * @see <a href="https://en.wikipedia.org/wiki/ELM327">ELM327</a>
- */
-public class Elm327Streams {
-	
-    /**
-     * Periodically execute a number of ELM327 commands.
-     * Each tuple on the returned stream is a JSON array containing
-     * the result for each command.
-     * <BR>
-     * Each result is a JSON object containing the
-     * {@link Cmd#id() command identifier} with key {@link Cmd#PID pid}
-     * and any result set by the individual command, typically with
-     * the key {@link Cmd#VALUE value}.
-     * 
-     * @param device Serial device the ELM327 is connected to.
-     * @param period Period to poll.
-     * @param unit Unit of {@code period}.
-     * @param cmds Commands to execute.
-     * @return Stream containing the results of the command exections.
-     */
-	public static TStream<JsonArray> poll(SerialDevice device, long period, TimeUnit unit, Cmd ... cmds) {
-		
-		Supplier<JsonArray> data = device.getSource(
-				port ->
-		{
-			JsonArray array = new JsonArray();
-			for (Cmd cmd : cmds) {
-				array.add(CommandExecutor.execute(cmd, port.getOutput(), port.getInput()));
-			}
-			return array;
-			
-		});
-		
-		return device.topology().poll(data, period, unit);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Pids01.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Pids01.java b/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Pids01.java
deleted file mode 100644
index e1a8b25..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/Pids01.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.elm327;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-
-import com.google.gson.JsonObject;
-
-import edgent.samples.connectors.elm327.runtime.CommandExecutor;
-
-/**
- * OBD-II Standard Mode 01 Pids.
- *
- * 
- * @see <a href="https://en.wikipedia.org/wiki/OBD-II_PIDs#Mode_01">OBD-II Mode 01 Pids</a>
- */
-public enum Pids01 implements Cmd {
-    
-    /**
-     * Get the list of available PIDs.
-     */
-	AVAILABLE_PIDS("00"),
-	
-	/**
-	 * Engine coolant temperature in �C.
-	 */
-	ENGINE_COOLANT_TEMP("05") {
-		@Override
-		protected boolean decode(JsonObject result, byte[] reply) {
-			
-			int[] binary = CommandExecutor.binary(reply, 4, 2);
-			
-			int c = binary[0] - 40;
-			result.addProperty(VALUE, c);
-			
-			return true;
-		}
-	},
-
-	/**
-	 * Engine speed in rpm.
-	 */
-	RPM("0C") {
-		@Override
-		protected boolean decode(JsonObject result, byte[] reply) {
-			
-			int[] binary = CommandExecutor.binary(reply, 4, 4);
-			int rpm = ((binary[0] * 256) + binary[1])/4;
-			result.addProperty(VALUE, rpm);
-			
-			return true;
-		}
-	},
-	
-	/**
-	 * Vehicle speed in km/h.
-	 */
-	SPEED("0D"){
-		@Override
-		protected boolean decode(JsonObject result, byte[] reply) {
-			
-			int[] binary = CommandExecutor.binary(reply, 4, 2);
-			
-			result.addProperty(VALUE, binary[0]);
-			
-			return true;
-		}
-	},
-	
-	/**
-     * Engine air intake temperature in �C.
-     */
-	AIR_INTAKE_TEMP("0F"){
-		@Override
-		protected boolean decode(JsonObject result, byte[] reply) {
-			
-			int[] binary = CommandExecutor.binary(reply, 4, 2);
-			
-			int c = binary[0] - 40;
-			result.addProperty(VALUE, c);
-			
-			return true;
-		}
-	},
-	;
-
-    private final String pid;
-	private final byte[] cmd;
-	
-	Pids01(String pid) {
-		this.pid = pid;
-		cmd = ("01" + pid + "1\r").getBytes(StandardCharsets.US_ASCII);
-	}
-	
-	public String id() {
-		return pid;
-	}
-	
-	@Override
-	public void writeCmd(OutputStream out) throws IOException {
-		out.write(cmd);
-	}
-	@Override
-	public final boolean result(JsonObject result, byte[] data) {
-		return validateReply(data) && decode(result, data);
-	}
-	 boolean decode(JsonObject result, byte[] data) {
-		 return true;
-	 }
-	
-	boolean validateReply(byte[] reply) {
-		if (reply[0] != '4')
-			return false;
-		if (reply[1] != '1')
-			return false;
-		if (reply[2] != pid.charAt(0))
-			return false;
-		if (reply[3] != pid.charAt(1))
-			return false;
-		
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/elm327/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/package-info.java b/samples/connectors/src/main/java/edgent/samples/connectors/elm327/package-info.java
deleted file mode 100644
index 8fa5401..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/package-info.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * OBD-II protocol sample using ELM327.
- * 
- * ELM327 devices allow connectivity to a vehicle's OBD-II information.
- *
- * @see <a href="https://en.wikipedia.org/wiki/OBD-II">OBD-II</a>
- * @see <a href="https://en.wikipedia.org/wiki/ELM327">ELM327</a>
- */
-package edgent.samples.connectors.elm327;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/elm327/runtime/CommandExecutor.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/runtime/CommandExecutor.java b/samples/connectors/src/main/java/edgent/samples/connectors/elm327/runtime/CommandExecutor.java
deleted file mode 100644
index 755e8a9..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/elm327/runtime/CommandExecutor.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.elm327.runtime;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import com.google.gson.JsonObject;
-
-import edgent.samples.connectors.elm327.Cmd;
-import edgent.samples.connectors.elm327.Elm327Cmds;
-
-/**
- * Runtime execution of ELM327 &amp; OBD-II commands.
- *
- */
-public class CommandExecutor {
-
-    public static int[] binary(byte[] reply, int offset, int length) {
-        int[] binary = new int[length / 2];
-        for (int i = 0; i < binary.length; i++) {
-            int h = Character.digit(reply[offset++], 16);
-            int l = Character.digit(reply[offset++], 16);
-            binary[i] = ((h * 16) + l);
-        }
-        return binary;
-    }
-
-    public static void initialize(Cmd protocol, OutputStream out, InputStream in) {
-        try {
-
-            executeUntilOK(10, Elm327Cmds.INIT, out, in);
-            Thread.sleep(1000);
-
-            executeUntilOK(1, Elm327Cmds.ECHO_OFF, out, in);
-
-            executeUntilOK(1, protocol, out, in);
-            executeUntilOK(1, Elm327Cmds.SLOW_INIT, out, in);
-            Thread.sleep(1000);
-
-        } catch (Exception ioe) {
-            throw new RuntimeException(ioe);
-        }
-    }
-
-    private static boolean readUntilPrompt(InputStream in, ByteArrayOutputStream bytes) throws IOException {
-        bytes.reset();
-        for (;;) {
-            int b = in.read();
-            if (b == -1)
-                return false;
-            if (b == ' ')
-                continue;
-            if (b == '\r')
-                continue;
-            if (b == '>')
-                return true;
-
-            bytes.write(b);
-        }
-    }
-
-    public static JsonObject executeUntilOK(int n, Cmd cmd, OutputStream out, InputStream in) throws IOException {
-        try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(16)) {
-            for (int i = 0; i < n; i++) {
-                cmd.writeCmd(out);
-                out.flush();
-
-                if (!readUntilPrompt(in, bytes))
-                    continue;
-
-                byte[] reply = bytes.toByteArray();
-                JsonObject j = new JsonObject();
-                if (cmd.result(j, reply))
-                    return j;
-                break;
-            }
-        }
-        throw new IllegalStateException("Could not execute command:" + cmd);
-    }
-
-    public static JsonObject execute(Cmd cmd, OutputStream out, InputStream in) {
-        try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(16)) {
-            cmd.writeCmd(out);
-            out.flush();
-
-            JsonObject result = new JsonObject();
-            result.addProperty(Cmd.PID, cmd.id());
-            result.addProperty(Cmd.TS, System.currentTimeMillis());
-
-            readUntilPrompt(in, bytes);
-
-            cmd.result(result, bytes.toByteArray());
-
-            return result;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/file/FileReaderApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/file/FileReaderApp.java b/samples/connectors/src/main/java/edgent/samples/connectors/file/FileReaderApp.java
deleted file mode 100644
index d460fb0..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/file/FileReaderApp.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.file;
-
-import java.io.File;
-
-import edgent.connectors.file.FileStreams;
-import edgent.console.server.HttpServer;
-import edgent.providers.development.DevelopmentProvider;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-
-/**
- * Watch a directory for files and convert their contents into a stream.
- */
-public class FileReaderApp {
-    private final String directory;
-    private static final String baseLeafname = "FileSample";
-
-    public static void main(String[] args) throws Exception {
-        if (args.length != 1)
-            throw new Exception("missing pathname to an existing directory");
-        FileReaderApp reader = new FileReaderApp(args[0]);
-        reader.run();
-    }
-   
-    /**
-     * 
-     * @param directory an existing directory to watch for file
-     */
-    public FileReaderApp(String directory) {
-        File dir = new File(directory);
-        if (!dir.exists())
-            throw new IllegalArgumentException("directory doesn't exist");
-        this.directory = directory;
-    }
-    
-    public void run() throws Exception {
-        DevelopmentProvider tp = new DevelopmentProvider();
-        
-        // build the application / topology
-        
-        Topology t = tp.newTopology("FileSample consumer");
-
-        // watch for files
-        TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> directory);
-        
-        // create a stream containing the files' contents.
-        // use a preFn to include a separator in the results.
-        // use a postFn to delete the file once its been processed.
-        TStream<String> contents = FileStreams.textFileReader(pathnames,
-                tuple -> "<PRE-FUNCTION> "+tuple, 
-                (tuple,exception) -> {
-                    // exercise a little caution in case the user pointed
-                    // us at a directory with other things in it
-                    if (tuple.contains("/"+baseLeafname+"_")) { 
-                        new File(tuple).delete();
-                    }
-                    return null;
-                });
-        
-        // print out what's being read
-        contents.print();
-        
-        // run the application / topology
-        System.out.println("starting the reader watching directory " + directory);
-        System.out.println("Console URL for the job: "
-                + tp.getServices().getService(HttpServer.class).getConsoleUrl());
-        tp.submit(t);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/file/FileWriterApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/file/FileWriterApp.java b/samples/connectors/src/main/java/edgent/samples/connectors/file/FileWriterApp.java
deleted file mode 100644
index 94c6eec..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/file/FileWriterApp.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.file;
-
-import java.io.File;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import edgent.connectors.file.FileStreams;
-import edgent.connectors.file.FileWriterCycleConfig;
-import edgent.connectors.file.FileWriterFlushConfig;
-import edgent.connectors.file.FileWriterPolicy;
-import edgent.connectors.file.FileWriterRetentionConfig;
-import edgent.console.server.HttpServer;
-import edgent.providers.development.DevelopmentProvider;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-
-/**
- * Write a TStream&lt;String&gt; to files.
- */
-public class FileWriterApp {
-    private final String directory;
-    private final String basePathname;
-    private static final String baseLeafname = "FileSample";
-    
-    public static void main(String[] args) throws Exception {
-        if (args.length != 1)
-            throw new Exception("missing pathname to an existing directory");
-        FileWriterApp writer = new FileWriterApp(args[0]);
-        writer.run();
-    }
-    
-    /**
-     * 
-     * @param directory an existing directory to create files in
-     */
-    public FileWriterApp(String directory) {
-        File dir = new File(directory);
-        if (!dir.exists())
-            throw new IllegalArgumentException("directory doesn't exist");
-        this.directory = directory;
-        basePathname = directory+"/"+baseLeafname;
-    }
-    
-    public void run() throws Exception {
-        DevelopmentProvider tp = new DevelopmentProvider();
-        
-        // build the application / topology
-        
-        Topology t = tp.newTopology("FileSample producer");
-        
-        FileWriterPolicy<String> policy = new FileWriterPolicy<String>(
-                FileWriterFlushConfig.newImplicitConfig(),
-                FileWriterCycleConfig.newCountBasedConfig(5),
-                FileWriterRetentionConfig.newFileCountBasedConfig(3));
-
-        // create a tuple stream to write out
-        AtomicInteger cnt = new AtomicInteger();
-        TStream<String> stream = t.poll(() -> {
-                String str = String.format("sample tuple %d %s",
-                        cnt.incrementAndGet(), new Date().toString());
-                System.out.println("created tuple: "+str);
-                return str;
-            }, 1, TimeUnit.SECONDS);
-        
-        // write the stream
-        FileStreams.textFileWriter(stream, () -> basePathname, () -> policy);
-        
-        // run the application / topology
-        System.out.println("starting the producer writing to directory " + directory);
-        System.out.println("Console URL for the job: "
-                + tp.getServices().getService(HttpServer.class).getConsoleUrl());
-        tp.submit(t);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/file/README
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/file/README b/samples/connectors/src/main/java/edgent/samples/connectors/file/README
deleted file mode 100644
index 4477518..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/file/README
+++ /dev/null
@@ -1,11 +0,0 @@
-Sample File Streams connector topology applications.
-
-The file writer application writes a stream's tuples to files.
-
-The file reader application watches a directory for files and reads their
-contents into a stream of tuples.
-
-see scripts/connectors/file/README to run them
-
-FileWriterApp.java - the writer application topology
-FileReaderApp.java - the reader application topology

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/file/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/file/package-info.java b/samples/connectors/src/main/java/edgent/samples/connectors/file/package-info.java
deleted file mode 100644
index 641a0e0..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/file/package-info.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Samples showing use of the 
- * <a href="{@docRoot}/edgent/connectors/file/package-summary.html">
- *     File stream connector</a>.
- * <p>
- * See &lt;edgent-release&gt;/scripts/connectors/file/README to run the samples.
- * <p>
- * The following samples are provided:
- * <ul>
- * <li>FileReaderApp.java - a simple directory watcher and file reader application topology</li>
- * <li>FileWriterApp.java - a simple file writer application topology</li>
- * </ul>
- */
-package edgent.samples.connectors.file;

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/iotf/IotfQuickstart.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/iotf/IotfQuickstart.java b/samples/connectors/src/main/java/edgent/samples/connectors/iotf/IotfQuickstart.java
deleted file mode 100644
index bda805a..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/iotf/IotfQuickstart.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.iotf;
-
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.iot.IotDevice;
-import edgent.connectors.iot.QoS;
-import edgent.connectors.iotf.IotfDevice;
-import edgent.providers.direct.DirectProvider;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-
-/**
- * IBM Watson IoT Platform Quickstart sample.
- * Submits a JSON device event every second using the
- * same format as the Quickstart device simulator,
- * with keys {@code temp}, {@code humidity}  and {@code objectTemp}
- * and random values.
- * <P>
- * The device type is {@code iotsamples-edgent} and a random
- * device identifier is generated. Both are printed out when
- * the application starts.
- * </P>
- * A URL is also printed that allows viewing of the data
- * as it received by the Quickstart service.
- */
-public class IotfQuickstart {
-
-    public static void main(String[] args) {
-
-        DirectProvider tp = new DirectProvider();
-        Topology topology = tp.newTopology("IotfQuickstart");
-        
-        // Declare a connection to IoTF Quickstart service
-        String deviceId = "qs" + Long.toHexString(new Random().nextLong());
-        IotDevice device = IotfDevice.quickstart(topology, deviceId);
-        
-        System.out.println("Quickstart device type:" + IotfDevice.QUICKSTART_DEVICE_TYPE);
-        System.out.println("Quickstart device id  :" + deviceId);
-        System.out.println("https://quickstart.internetofthings.ibmcloud.com/#/device/"
-             + deviceId);
-             
-        Random r = new Random();
-        TStream<double[]> raw = topology.poll(() -> {
-            double[]  v = new double[3];
-            
-            v[0] = r.nextGaussian() * 10.0 + 40.0;
-            v[1] = r.nextGaussian() * 10.0 + 50.0;
-            v[2] = r.nextGaussian() * 10.0 + 60.0;
-            
-            return v;
-        }, 1, TimeUnit.SECONDS);
-        
-        TStream<JsonObject> json = raw.map(v -> {
-            JsonObject j = new JsonObject();
-            j.addProperty("temp", v[0]);
-            j.addProperty("humidity", v[1]);
-            j.addProperty("objectTemp", v[2]);
-            return j;
-        });
-        
-        device.events(json, "sensors", QoS.FIRE_AND_FORGET);
-
-        tp.submit(topology);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/iotf/IotfSensors.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/iotf/IotfSensors.java b/samples/connectors/src/main/java/edgent/samples/connectors/iotf/IotfSensors.java
deleted file mode 100644
index 5715719..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/iotf/IotfSensors.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.samples.connectors.iotf;
-
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.iot.HeartBeat;
-import edgent.connectors.iot.IotDevice;
-import edgent.connectors.iot.QoS;
-import edgent.connectors.iotf.IotfDevice;
-import edgent.providers.direct.DirectProvider;
-import edgent.providers.direct.DirectTopology;
-import edgent.samples.topology.SensorsAggregates;
-import edgent.topology.TStream;
-
-/**
- * Sample sending sensor device events to IBM Watson IoT Platform. <BR>
- * Simulates a couple of bursty sensors and sends the readings from the sensors
- * to IBM Watson IoT Platform as device events with id {@code sensors}. <BR>
- * Subscribes to device commands with identifier {@code display}.
- * <P>
- * In addition a device event with id {@code hearbeat} is sent
- * every minute. This ensure a connection attempt to IBM Watson IoT Platform
- * is made immediately rather than waiting for a bursty sensor to become
- * active.
- * <P>
- * This sample requires an IBM Watson IoT Platform service and a device configuration.
- * The device configuration is read from the file {@code device.cfg} in the
- * current directory. <BR>
- * In order to see commands send from IBM Watson IoT Platform
- * there must be an analytic application
- * that sends commands with the identifier {@code display}.
- * </P>
- */
-public class IotfSensors {
-
-    /**
-     * Run the IotfSensors application.
-     * 
-     * Takes a single argument that is the path to the
-     * device configuration file containing the connection
-     * authentication information.
-     * 
-     * @param args Must contain the path to the device configuration file.
-     * 
-     * @see IotfDevice#IotfDevice(edgent.topology.Topology, File)
-     */
-    public static void main(String[] args) {
-        
-        String deviceCfg = args[0];
-
-        DirectProvider tp = new DirectProvider();
-        DirectTopology topology = tp.newTopology("IotfSensors");
-
-        // Declare a connection to IoTF
-        IotDevice device = new IotfDevice(topology, new File(deviceCfg));
-
-        // Simulated sensors for this device.
-        simulatedSensors(device, true);
-        
-        // Heartbeat
-        heartBeat(device, true);
-
-        // Subscribe to commands of id "display" for this
-        // device and print them to standard out
-        displayMessages(device, true);
-
-        tp.submit(topology);
-    }
-
-
-    /**
-     * Simulate two bursty sensors and send the readings as IoTF device events
-     * with an identifier of {@code sensors}.
-     * 
-     * @param device
-     *            IoT device
-     * @param print
-     *            True if the data submitted as events should also be printed to
-     *            standard out.
-     */
-    public static void simulatedSensors(IotDevice device, boolean print) {
-
-        TStream<JsonObject> sensors = SensorsAggregates.sensorsAB(device.topology());
-        if (print)
-            sensors.print();
-
-        // Send the device streams as IoTF device events
-        // with event identifier "sensors".
-        device.events(sensors, "sensors", QoS.FIRE_AND_FORGET);
-    }
-    
-    /**
-     * Create a heart beat device event with
-     * identifier {@code heartbeat} to
-     * ensure there is some immediate output and
-     * the connection to IoTF happens as soon as possible.
-     * @param device IoT device
-     * @param print true to print generated heartbeat tuples to System.out.
-     */
-    public static void heartBeat(IotDevice device, boolean print) {
-      TStream<JsonObject> hbs = 
-          HeartBeat.addHeartBeat(device, 1, TimeUnit.MINUTES, "heartbeat");
-      if (print)
-        hbs.print();
-    }
-    
-
-    /**
-     * Subscribe to IoTF device commands with identifier {@code display}.
-     * Subscribing to device commands returns a stream of JSON objects that
-     * include a timestamp ({@code tsms}), command identifier ({@code command})
-     * and payload ({@code payload}). Payload is the application specific
-     * portion of the command. <BR>
-     * In this case the payload is expected to be a JSON object containing a
-     * {@code msg} key with a string display message. <BR>
-     * The returned stream consists of the display message string extracted from
-     * the JSON payload.
-     * <P>
-     * Note to receive commands a analytic application must exist that generates
-     * them through IBM Watson IoT Platform.
-     * </P>
-     *
-     * @param device the device
-     * @param print true to print the received command's payload to System.out.
-     * @return the stream
-     * @see IotDevice#commands(String...)
-     */
-    public static TStream<String> displayMessages(IotDevice device, boolean print) {
-        // Subscribe to commands of id "display" for this device
-        TStream<JsonObject> statusMsgs = device.commands("display");
-
-        // The returned JSON object includes several fields
-        // tsms - Timestamp in milliseconds (this is generic to a command)
-        // payload.msg - Status message (this is specific to this application)
-
-        // Map to a String object containing the message
-        TStream<String> messages = statusMsgs.map(j -> j.getAsJsonObject("payload").getAsJsonPrimitive("msg").getAsString());
-        if (print)
-            messages.print();
-        return messages;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/iotf/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/iotf/package-info.java b/samples/connectors/src/main/java/edgent/samples/connectors/iotf/package-info.java
deleted file mode 100644
index 65d7482..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/iotf/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Samples showing device events and commands with IBM Watson IoT Platform.
- */
-package edgent.samples.connectors.iotf;
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/DbUtils.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/DbUtils.java b/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/DbUtils.java
deleted file mode 100644
index 4bb858d..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/DbUtils.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.jdbc;
-
-import java.lang.reflect.Method;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Properties;
-
-import javax.sql.DataSource;
-
-/**
- * Utilities for the sample's non-streaming JDBC database related actions.
- */
-public class DbUtils {
-    
-    /**
-     * Get the JDBC {@link DataSource} for the database.
-     * <p>
-     * The "db.name" property specifies the name of the database.
-     * Defaults to "JdbcConnectorSampleDb".
-     * 
-     * @param props configuration properties
-     * @return the DataSource
-     * @throws Exception on failure
-     */
-    public static DataSource getDataSource(Properties props) throws Exception {
-        return createDerbyEmbeddedDataSource(props);
-    }
-    
-    /**
-     * Initialize the sample's database.
-     * <p>
-     * Tables are created as needed and purged.
-     * @param ds the DataSource
-     * @throws Exception on failure
-     */
-    public static void initDb(DataSource ds) throws Exception {
-        createTables(ds);
-        purgeTables(ds);
-    }
-    
-    /**
-     * Purge the sample's tables
-     * @param ds the DataSource
-     * @throws Exception on failure
-     */
-    public static void purgeTables(DataSource ds) throws Exception {
-        try (Connection cn = ds.getConnection()) {
-            Statement stmt = cn.createStatement();
-            stmt.execute("DELETE FROM persons");
-        }
-    }
-
-    private static void createTables(DataSource ds) throws Exception {
-        try (Connection cn = ds.getConnection()) {
-            Statement stmt = cn.createStatement();
-            stmt.execute("CREATE TABLE persons "
-                    + "("
-                    + "id INTEGER NOT NULL,"
-                    + "firstname VARCHAR(40) NOT NULL,"
-                    + "lastname VARCHAR(40) NOT NULL,"
-                    + "PRIMARY KEY (id)"
-                    + ")"
-                    );
-        }
-        catch (SQLException e) {
-            if (e.getLocalizedMessage().contains("already exists"))
-                return;
-            else
-                throw e;
-        }
-   }
-
-   private static DataSource createDerbyEmbeddedDataSource(Properties props) throws Exception
-   {
-       String dbName = props.getProperty("db.name", "JdbcConnectorSampleDb");
-       
-       // For our sample, avoid a compile-time dependency to the jdbc driver.
-       // At runtime, require that the classpath can find it.
-
-       String DERBY_DATA_SOURCE = "org.apache.derby.jdbc.EmbeddedDataSource";
-   
-       Class<?> nsDataSource = null;
-       try {
-           nsDataSource = Class.forName(DERBY_DATA_SOURCE);
-       }
-       catch (ClassNotFoundException e) {
-           String msg = "Fix the test classpath. ";
-           if (System.getenv("DERBY_HOME") == null) {
-               msg += "DERBY_HOME not set. ";
-           }
-           msg += "Class not found: "+e.getLocalizedMessage();
-           System.err.println(msg);
-           throw new IllegalStateException(msg);
-       }
-       DataSource ds = (DataSource) nsDataSource.newInstance();
-
-       @SuppressWarnings("rawtypes")
-       Class[] methodParams = new Class[] {String.class};
-       Method dbname = nsDataSource.getMethod("setDatabaseName", methodParams);
-       Object[] args = new Object[] {dbName};
-       dbname.invoke(ds, args);
-
-       // create the db if necessary
-       Method create = nsDataSource.getMethod("setCreateDatabase", methodParams);
-       args = new Object[] {"create"};
-       create.invoke(ds, args);
-
-       // set the user
-       Method setuser = nsDataSource.getMethod("setUser", methodParams);
-       args = new Object[] { props.getProperty("db.user", System.getProperty("user.name")) };
-       setuser.invoke(ds, args);
-
-       // optionally set the pw
-       Method setpw = nsDataSource.getMethod("setPassword", methodParams);
-       args = new Object[] { props.getProperty("db.password") };
-       if (args[0] != null)
-           setpw.invoke(ds, args);
-   
-       return ds;
-   }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/Person.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/Person.java b/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/Person.java
deleted file mode 100644
index aaffbac..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/Person.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.jdbc;
-
-/**
- * A Person object for the sample.
- */
-public class Person {
-    int id;
-    String firstName;
-    String lastName;
-    Person(int id, String first, String last) {
-        this.id = id;
-        this.firstName = first;
-        this.lastName = last;
-    }
-    public String toString() {
-        return String.format("id=%d first=%s last=%s",
-                id, firstName, lastName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/PersonData.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/PersonData.java b/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/PersonData.java
deleted file mode 100644
index 4d933c6..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/PersonData.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.jdbc;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-/**
- * Utilities for loading the sample's person data.
- */
-public class PersonData {
-    
-    /**
-     * Load the person data from the path specified by the "persondata.path"
-     * property.
-     * @param props configuration properties
-     * @return the loaded person data
-     * @throws Exception on failure
-     */
-    public static List<Person> loadPersonData(Properties props) throws Exception {
-        String pathname = props.getProperty("persondata.path");
-        List<Person> persons = new ArrayList<>();
-        Path path = new File(pathname).toPath();
-        try (BufferedReader br = Files.newBufferedReader(path)) {
-            int lineno = 0;
-            String line;
-            while ((line = br.readLine()) != null) {
-                lineno++;
-                Object[] fields = parseLine(line, lineno, pathname);
-                if (fields == null)
-                    continue;
-                persons.add(new Person((Integer)fields[0], (String)fields[1], (String)fields[2]));
-            }
-        }
-        return persons;
-    }
-    
-    private static Object[] parseLine(String line, int lineno, String pathname) {
-        line = line.trim();
-        if (line.startsWith("#"))
-            return null;
-
-        // id,firstName,lastName
-        String[] items = line.split(",");
-        if (items.length < 3)
-            throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
-        int id;
-        try {
-           id = new Integer(items[0]);
-           if (id < 1)
-               throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
-        }
-        catch (NumberFormatException e) {
-            throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
-        }
-        
-        Object[] fields = new Object[3];
-        fields[0] = id;
-        fields[1] = items[1].trim();
-        fields[2] = items[2].trim();
-        return fields;
-    }
-
-    /**
-     * Convert a {@code List<Person>} to a {@code List<PersonId>}
-     * @param persons the person list
-     * @return the person id list
-     */
-    public static List<PersonId> toPersonIds(List<Person> persons) {
-        return persons.stream()
-            .map(person -> new PersonId(person.id))
-            .collect(Collectors.toList());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/PersonId.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/PersonId.java b/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/PersonId.java
deleted file mode 100644
index e9cf150..0000000
--- a/samples/connectors/src/main/java/edgent/samples/connectors/jdbc/PersonId.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.samples.connectors.jdbc;
-
-/**
- * Another class containing a person id for the sample.
- */
-public class PersonId {
-    int id;
-    PersonId(int id) {
-        this.id = id;
-    }
-    public String toString() {
-        return String.format("id=%d", id);
-    }
-}