You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/03/21 21:34:50 UTC

[1/2] incubator-quarks git commit: cleanup

Repository: incubator-quarks
Updated Branches:
  refs/heads/master 3f95e5d8f -> cf1c70d1b


cleanup

- remove "batch" window workaround
- improve stream tagging for console use

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/fb87f0aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/fb87f0aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/fb87f0aa

Branch: refs/heads/master
Commit: fb87f0aa5f6fcdc185cf6aaec9626ca4a131064a
Parents: bcbce8a
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon Mar 21 16:13:18 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon Mar 21 16:13:18 2016 -0400

----------------------------------------------------------------------
 .../quarks/connectors/mqtt/iot/MqttDevice.java  |  3 +-
 .../samples/apps/ApplicationUtilities.java      |  4 +-
 .../java/quarks/samples/apps/JsonTuples.java    | 83 --------------------
 .../samples/apps/sensorAnalytics/Sensor1.java   | 37 +++++----
 4 files changed, 25 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fb87f0aa/connectors/mqtt/src/main/java/quarks/connectors/mqtt/iot/MqttDevice.java
----------------------------------------------------------------------
diff --git a/connectors/mqtt/src/main/java/quarks/connectors/mqtt/iot/MqttDevice.java b/connectors/mqtt/src/main/java/quarks/connectors/mqtt/iot/MqttDevice.java
index 7a9d1f7..c897b92 100644
--- a/connectors/mqtt/src/main/java/quarks/connectors/mqtt/iot/MqttDevice.java
+++ b/connectors/mqtt/src/main/java/quarks/connectors/mqtt/iot/MqttDevice.java
@@ -238,7 +238,8 @@ public class MqttDevice implements IotDevice {
                             jo.addProperty(CMD_PAYLOAD, new String(payload, StandardCharsets.UTF_8));
                         }
                         return jo;
-                    });
+                    })
+                    .tag("allDeviceCmds");
         }
         return commandStream;
     }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fb87f0aa/samples/apps/src/main/java/quarks/samples/apps/ApplicationUtilities.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/quarks/samples/apps/ApplicationUtilities.java b/samples/apps/src/main/java/quarks/samples/apps/ApplicationUtilities.java
index 6f63a96..084712c 100644
--- a/samples/apps/src/main/java/quarks/samples/apps/ApplicationUtilities.java
+++ b/samples/apps/src/main/java/quarks/samples/apps/ApplicationUtilities.java
@@ -70,7 +70,7 @@ public class ApplicationUtilities {
      */
     public <T> TStream<T> traceStream(TStream<T> stream, Supplier<String> label) {
         if (includeTraceStreamOps(label.get())) {
-            TStream<?> s = stream.filter(traceTuplesFn(label.get()));
+            TStream<?> s = stream.filter(traceTuplesFn(label.get())).tag(label.get()+".trace");
             s.peek(sample -> System.out.println(String.format("%s: %s", label.get(), sample.toString())));
         }
         return stream;
@@ -168,7 +168,7 @@ public class ApplicationUtilities {
          
         // Transform the stream to a TStream<String> of string log entry values
         TStream<String> stringEntries = stream.map(sample -> String.format("[%s] [%s] %s", new Date().toString(), eventTag, sample.toString()))
-                .tag("log."+baseName);
+                .tag(baseName+".log");
 
         // Use the FileStreams connector to write the logs.
         //

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fb87f0aa/samples/apps/src/main/java/quarks/samples/apps/JsonTuples.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/quarks/samples/apps/JsonTuples.java b/samples/apps/src/main/java/quarks/samples/apps/JsonTuples.java
index 370308e..475a8bd 100644
--- a/samples/apps/src/main/java/quarks/samples/apps/JsonTuples.java
+++ b/samples/apps/src/main/java/quarks/samples/apps/JsonTuples.java
@@ -13,7 +13,6 @@ import java.util.List;
 
 import org.apache.commons.math3.util.Pair;
 
-import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 
@@ -168,44 +167,6 @@ public class JsonTuples {
                 };
     }
 
-    /**
-     * Create a function that creates a new sample containing the collection
-     * of input sample readings.
-     * <p>
-     * The single sample collects the all of the {@code JsonTuple.KEY_READING} 
-     * values from {@code List<JsonObject>} into a single {@code JsonArray}.
-     * <p> 
-     * The resulting sample contains the properties:
-     * <ul>
-     * <li>JsonTuple.KEY_ID</li>
-     * <li>JsonTuple.KEY_MSEC - msecTimestamp of the last sample in the window</li>
-     * <li>JsonTuple.KEY_AGG_BEGIN_MSEC - msecTimestamp of the first sample in the window</li>
-     * <li>JsonTuple.KEY_AGG_COUNT - number of samples in the window ({@code value=factor})</li>
-     * <li>JsonTuple.KEY_READING - a JsonArray of each JsonObject's KEY_READING reading</li>
-     * </ul>
-     * <p>
-     * Sample use:
-     * <pre>{@code
-     * TStream<JsonObject> s = ...
-     * // reduce s by a factor of 100 
-     * TStream<JsonObject> reduced = batch(s, 100, collect());
-     * }</pre>
-     * @return the accumulate function
-     * @see #batch(TStream, int, BiFunction)
-     */
-    public static BiFunction<List<JsonObject>,String,JsonObject> collect() {
-        return (samples, key) -> {
-            JsonObject jo = new JsonObject();
-            JsonTuples.addAggStdInfo(jo, samples);
-            JsonArray ja = new JsonArray();
-            for (JsonObject j : samples) {
-                ja.add(j.get(KEY_READING));
-            }
-            jo.add(KEY_READING, ja);
-            return jo;
-        };
-    }
-
     private static void addAggStdInfo(JsonObject jo, List<JsonObject> samples) {
         // beginMsec, endMsec, nSamples
         long msec = samples.get(0).get(KEY_TS).getAsLong();
@@ -217,48 +178,4 @@ public class JsonTuples {
         jo.addProperty(KEY_AGG_COUNT, nSamples);
     }
 
-    /**
-     * Process a window of tuples in batches.  
-     * TODO REMOVE ONCE TStream.batch(int) or such exists.
-     * <p>
-     * Accumulate a window of {@code size} tuples, 
-     * invoke {@code batcher.apply()} on the batch, clear the window,
-     * and repeat for the next batch.
-     * <p>
-     * Useful for applying analytics to reduce the number of samples
-     * by a factor of {@code size} - e.g., 100:1.  Typical reductions
-     * compute some characteristic statistic for the batch, e.g., a mean,
-     * or simply collect the samples into a single sample to then
-     * process by some other complex analytic such as an FFT.
-     * <p>
-     * Sample use:
-     * <pre>{@code
-     * TStream<JsonObject> s = ...
-     * // reduce s by a factor of 100 
-     * TStream<JsonObject> reduced = Reducers.batch(s, 100, 
-     *          JsonTuples.statistics(Statistic.MEAN, Statistic.STDDEV));
-     * }</pre>
-     * 
-     * @param stream the stream to reduce
-     * @param size the batch size
-     * @param batcher function to perform the aggregation
-     * @return {@code TStream<JsonObject>} for the reduced {@code stream}
-     */
-    public static TStream<JsonObject> batch(TStream<JsonObject> stream, int size, BiFunction<List<JsonObject>,String,JsonObject> batcher) {
-        // workaround for omission of batched windows
-        int[] cnt = new int[] {0};
-        return stream.last(size, keyFn())
-                .aggregate(
-                        (samples, key) -> {
-                            if (++cnt[0] >= size) {
-                                cnt[0] = 0;
-                                return batcher.apply(samples, key);
-                            }
-                            else
-                                return null;
-                        })
-                // workaround issue#133 where aggregate doesn't ignore null fn results
-                .filter(sample -> sample != null);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fb87f0aa/samples/apps/src/main/java/quarks/samples/apps/sensorAnalytics/Sensor1.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/quarks/samples/apps/sensorAnalytics/Sensor1.java b/samples/apps/src/main/java/quarks/samples/apps/sensorAnalytics/Sensor1.java
index dbd691b..2b737bf 100644
--- a/samples/apps/src/main/java/quarks/samples/apps/sensorAnalytics/Sensor1.java
+++ b/samples/apps/src/main/java/quarks/samples/apps/sensorAnalytics/Sensor1.java
@@ -100,14 +100,14 @@ public class Sensor1 {
         
         // Handle the sensor's device commands
         app.mqttDevice().commands(commandId("set1hzMeanRangeThreshold"))
-            .tag(commandId("set1hzMeanRangeThreshold"))
+            .tag(commandId("set1hzMeanRangeThresholdCmd"))
             .sink(jo -> {
                     Range<Integer> newRange = Range.valueOf(getCommandValue(jo), Integer.class);
                     System.out.println("===== Changing range to "+newRange+" ======");
                     range.set(newRange);
                 });
         app.mqttDevice().commands(commandId("setPublish1hzOutsideRange"))
-            .tag(commandId("setPublish1hzOutsideRange"))
+            .tag(commandId("setPublish1hzOutsideRangeCmd"))
             .sink(jo -> {
                     Boolean b = new Boolean(getCommandValue(jo));
                     System.out.println("===== Changing isPublish1hzOutsideRange to "+b+" ======");
@@ -123,14 +123,15 @@ public class Sensor1 {
         traceStream(raw1khz, "raw1khz");
         
         // Wrap the raw sensor reading in a JsonObject for convenience.
-        TStream<JsonObject> j1khz = JsonTuples.wrap(raw1khz, sensorId);
+        TStream<JsonObject> j1khz = JsonTuples.wrap(raw1khz, sensorId)
+                .tag("j1khz");
         traceStream(j1khz, "j1khz");
         
         // Data-reduction: reduce 1khz samples down to
         // 1hz aggregate statistics samples.
-        TStream<JsonObject> j1hzStats = JsonTuples.batch(j1khz, 1000,
-                                JsonTuples.statistics(MIN, MAX, MEAN, STDDEV))
-                 .tag("1hzStats");
+        TStream<JsonObject> j1hzStats = j1khz.last(1000, JsonTuples.keyFn())
+                .batch(JsonTuples.statistics(MIN, MAX, MEAN, STDDEV))
+                .tag("1hzStats");
         
         // Create a 30 second sliding window of average trailing Mean values
         // and enrich samples with that information.
@@ -145,7 +146,8 @@ public class Sensor1 {
                 jo.addProperty("AvgTrailingMean", Math.round(meanSum / samples.size()));
                 jo.addProperty("AvgTrailingMeanCnt", samples.size());
                 return jo;
-            });
+            })
+            .tag("1hzStats.enriched");
         traceStream(j1hzStats, "j1hzStats");
 
         // Detect 1hz samples whose MEAN value are
@@ -154,7 +156,8 @@ public class Sensor1 {
                 sample -> {
                     int value = JsonTuples.getStatistic(sample, MEAN).getAsInt();
                     return !range.get().contains(value);
-                }).tag("outside1hzMeanRange");
+                })
+                .tag("outside1hzMeanRange");
         traceStream(outside1hzMeanRange, () -> "outside1hzMeanRange"+range.get()); 
         
         // Log every outside1hzMeanRange event
@@ -166,8 +169,9 @@ public class Sensor1 {
         // TODO enhance MqttDevice with configurable reliever. 
         app.mqttDevice().events(
                 PlumbingStreams.pressureReliever(
-                    outside1hzMeanRange.filter(tuple -> isPublish1hzOutsideRange.get()),
-                    tuple -> 0, 30),
+                    outside1hzMeanRange.filter(tuple -> isPublish1hzOutsideRange.get())
+                                       .tag("outside1hzMeanRangeEvent.conditional"),
+                    tuple -> 0, 30).tag("outside1hzMeanRangeEvent.pressureRelieved"),
                 app.sensorEventId(sensorId, "outside1hzMeanRangeEvent"), QoS.FIRE_AND_FORGET);
         
         // Demonstrate periodic publishing of a sliding window if
@@ -200,7 +204,7 @@ public class Sensor1 {
         List<JsonObject> lastN = Collections.synchronizedList(new ArrayList<>());
         stream.last(count, JsonTuples.keyFn())
             .aggregate((samples, key) -> samples)
-            .tag("lastN")
+            .tag(event+".lastN")
             .sink(samples -> {
                     // Capture the new list/window.  
                     synchronized(lastN) {
@@ -212,8 +216,8 @@ public class Sensor1 {
         // Publish the lastN (with trimmed down info) every nSec seconds
         // if anything changed since the last publish.
         TStream<JsonObject> periodicLastN = 
-                t.poll(() -> 1, nSec, TimeUnit.SECONDS)
-                .filter(trigger -> !lastN.isEmpty())
+                t.poll(() -> 1, nSec, TimeUnit.SECONDS).tag(event+".trigger")
+                .filter(trigger -> !lastN.isEmpty()).tag(event+".changed")
                 .map(trigger -> {
                     synchronized(lastN) {
                         // create a single JsonObject with the list
@@ -237,15 +241,16 @@ public class Sensor1 {
                         return jo;
                     }
                 })
-                .tag("periodicLastN");
+                .tag(event);
 
-        traceStream(periodicLastN, "periodicLastN-"+event);
+        traceStream(periodicLastN, event);
 
         // Use a pressureReliever to prevent backpressure if the broker
         // can't be contacted.
         // TODO enhance MqttDevice with configurable reliever. 
         app.mqttDevice().events(
-                PlumbingStreams.pressureReliever(periodicLastN, tuple -> 0, 30),
+                PlumbingStreams.pressureReliever(periodicLastN, tuple -> 0, 30)
+                    .tag(event+".pressureRelieved"),
                 app.sensorEventId(sensorId, event), QoS.FIRE_AND_FORGET);
     }
     


[2/2] incubator-quarks git commit: Merge pull request #31

Posted by dl...@apache.org.
Merge pull request #31

This closes #31


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/cf1c70d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/cf1c70d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/cf1c70d1

Branch: refs/heads/master
Commit: cf1c70d1b825eade3821c0a00e3567fbe61bd4c8
Parents: 3f95e5d fb87f0a
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon Mar 21 16:34:24 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon Mar 21 16:34:24 2016 -0400

----------------------------------------------------------------------
 .../quarks/connectors/mqtt/iot/MqttDevice.java  |  3 +-
 .../samples/apps/ApplicationUtilities.java      |  4 +-
 .../java/quarks/samples/apps/JsonTuples.java    | 83 --------------------
 .../samples/apps/sensorAnalytics/Sensor1.java   | 37 +++++----
 4 files changed, 25 insertions(+), 102 deletions(-)
----------------------------------------------------------------------