You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/03/21 21:34:50 UTC
[1/2] incubator-quarks git commit: cleanup
Repository: incubator-quarks
Updated Branches:
refs/heads/master 3f95e5d8f -> cf1c70d1b
cleanup
- remove "batch" window workaround
- improve stream tagging for console use
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/fb87f0aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/fb87f0aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/fb87f0aa
Branch: refs/heads/master
Commit: fb87f0aa5f6fcdc185cf6aaec9626ca4a131064a
Parents: bcbce8a
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon Mar 21 16:13:18 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon Mar 21 16:13:18 2016 -0400
----------------------------------------------------------------------
.../quarks/connectors/mqtt/iot/MqttDevice.java | 3 +-
.../samples/apps/ApplicationUtilities.java | 4 +-
.../java/quarks/samples/apps/JsonTuples.java | 83 --------------------
.../samples/apps/sensorAnalytics/Sensor1.java | 37 +++++----
4 files changed, 25 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fb87f0aa/connectors/mqtt/src/main/java/quarks/connectors/mqtt/iot/MqttDevice.java
----------------------------------------------------------------------
diff --git a/connectors/mqtt/src/main/java/quarks/connectors/mqtt/iot/MqttDevice.java b/connectors/mqtt/src/main/java/quarks/connectors/mqtt/iot/MqttDevice.java
index 7a9d1f7..c897b92 100644
--- a/connectors/mqtt/src/main/java/quarks/connectors/mqtt/iot/MqttDevice.java
+++ b/connectors/mqtt/src/main/java/quarks/connectors/mqtt/iot/MqttDevice.java
@@ -238,7 +238,8 @@ public class MqttDevice implements IotDevice {
jo.addProperty(CMD_PAYLOAD, new String(payload, StandardCharsets.UTF_8));
}
return jo;
- });
+ })
+ .tag("allDeviceCmds");
}
return commandStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fb87f0aa/samples/apps/src/main/java/quarks/samples/apps/ApplicationUtilities.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/quarks/samples/apps/ApplicationUtilities.java b/samples/apps/src/main/java/quarks/samples/apps/ApplicationUtilities.java
index 6f63a96..084712c 100644
--- a/samples/apps/src/main/java/quarks/samples/apps/ApplicationUtilities.java
+++ b/samples/apps/src/main/java/quarks/samples/apps/ApplicationUtilities.java
@@ -70,7 +70,7 @@ public class ApplicationUtilities {
*/
public <T> TStream<T> traceStream(TStream<T> stream, Supplier<String> label) {
if (includeTraceStreamOps(label.get())) {
- TStream<?> s = stream.filter(traceTuplesFn(label.get()));
+ TStream<?> s = stream.filter(traceTuplesFn(label.get())).tag(label.get()+".trace");
s.peek(sample -> System.out.println(String.format("%s: %s", label.get(), sample.toString())));
}
return stream;
@@ -168,7 +168,7 @@ public class ApplicationUtilities {
// Transform the stream to a TStream<String> of string log entry values
TStream<String> stringEntries = stream.map(sample -> String.format("[%s] [%s] %s", new Date().toString(), eventTag, sample.toString()))
- .tag("log."+baseName);
+ .tag(baseName+".log");
// Use the FileStreams connector to write the logs.
//
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fb87f0aa/samples/apps/src/main/java/quarks/samples/apps/JsonTuples.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/quarks/samples/apps/JsonTuples.java b/samples/apps/src/main/java/quarks/samples/apps/JsonTuples.java
index 370308e..475a8bd 100644
--- a/samples/apps/src/main/java/quarks/samples/apps/JsonTuples.java
+++ b/samples/apps/src/main/java/quarks/samples/apps/JsonTuples.java
@@ -13,7 +13,6 @@ import java.util.List;
import org.apache.commons.math3.util.Pair;
-import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@@ -168,44 +167,6 @@ public class JsonTuples {
};
}
- /**
- * Create a function that creates a new sample containing the collection
- * of input sample readings.
- * <p>
- * The single sample collects the all of the {@code JsonTuple.KEY_READING}
- * values from {@code List<JsonObject>} into a single {@code JsonArray}.
- * <p>
- * The resulting sample contains the properties:
- * <ul>
- * <li>JsonTuple.KEY_ID</li>
- * <li>JsonTuple.KEY_MSEC - msecTimestamp of the last sample in the window</li>
- * <li>JsonTuple.KEY_AGG_BEGIN_MSEC - msecTimestamp of the first sample in the window</li>
- * <li>JsonTuple.KEY_AGG_COUNT - number of samples in the window ({@code value=factor})</li>
- * <li>JsonTuple.KEY_READING - a JsonArray of each JsonObject's KEY_READING reading</li>
- * </ul>
- * <p>
- * Sample use:
- * <pre>{@code
- * TStream<JsonObject> s = ...
- * // reduce s by a factor of 100
- * TStream<JsonObject> reduced = batch(s, 100, collect());
- * }</pre>
- * @return the accumulate function
- * @see #batch(TStream, int, BiFunction)
- */
- public static BiFunction<List<JsonObject>,String,JsonObject> collect() {
- return (samples, key) -> {
- JsonObject jo = new JsonObject();
- JsonTuples.addAggStdInfo(jo, samples);
- JsonArray ja = new JsonArray();
- for (JsonObject j : samples) {
- ja.add(j.get(KEY_READING));
- }
- jo.add(KEY_READING, ja);
- return jo;
- };
- }
-
private static void addAggStdInfo(JsonObject jo, List<JsonObject> samples) {
// beginMsec, endMsec, nSamples
long msec = samples.get(0).get(KEY_TS).getAsLong();
@@ -217,48 +178,4 @@ public class JsonTuples {
jo.addProperty(KEY_AGG_COUNT, nSamples);
}
- /**
- * Process a window of tuples in batches.
- * TODO REMOVE ONCE TStream.batch(int) or such exists.
- * <p>
- * Accumulate a window of {@code size} tuples,
- * invoke {@code batcher.apply()} on the batch, clear the window,
- * and repeat for the next batch.
- * <p>
- * Useful for applying analytics to reduce the number of samples
- * by a factor of {@code size} - e.g., 100:1. Typical reductions
- * compute some characteristic statistic for the batch, e.g., a mean,
- * or simply collect the samples into a single sample to then
- * process by some other complex analytic such as an FFT.
- * <p>
- * Sample use:
- * <pre>{@code
- * TStream<JsonObject> s = ...
- * // reduce s by a factor of 100
- * TStream<JsonObject> reduced = Reducers.batch(s, 100,
- * JsonTuples.statistics(Statistic.MEAN, Statistic.STDDEV));
- * }</pre>
- *
- * @param stream the stream to reduce
- * @param size the batch size
- * @param batcher function to perform the aggregation
- * @return {@code TStream<JsonObject>} for the reduced {@code stream}
- */
- public static TStream<JsonObject> batch(TStream<JsonObject> stream, int size, BiFunction<List<JsonObject>,String,JsonObject> batcher) {
- // workaround for omission of batched windows
- int[] cnt = new int[] {0};
- return stream.last(size, keyFn())
- .aggregate(
- (samples, key) -> {
- if (++cnt[0] >= size) {
- cnt[0] = 0;
- return batcher.apply(samples, key);
- }
- else
- return null;
- })
- // workaround issue#133 where aggregate doesn't ignore null fn results
- .filter(sample -> sample != null);
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fb87f0aa/samples/apps/src/main/java/quarks/samples/apps/sensorAnalytics/Sensor1.java
----------------------------------------------------------------------
diff --git a/samples/apps/src/main/java/quarks/samples/apps/sensorAnalytics/Sensor1.java b/samples/apps/src/main/java/quarks/samples/apps/sensorAnalytics/Sensor1.java
index dbd691b..2b737bf 100644
--- a/samples/apps/src/main/java/quarks/samples/apps/sensorAnalytics/Sensor1.java
+++ b/samples/apps/src/main/java/quarks/samples/apps/sensorAnalytics/Sensor1.java
@@ -100,14 +100,14 @@ public class Sensor1 {
// Handle the sensor's device commands
app.mqttDevice().commands(commandId("set1hzMeanRangeThreshold"))
- .tag(commandId("set1hzMeanRangeThreshold"))
+ .tag(commandId("set1hzMeanRangeThresholdCmd"))
.sink(jo -> {
Range<Integer> newRange = Range.valueOf(getCommandValue(jo), Integer.class);
System.out.println("===== Changing range to "+newRange+" ======");
range.set(newRange);
});
app.mqttDevice().commands(commandId("setPublish1hzOutsideRange"))
- .tag(commandId("setPublish1hzOutsideRange"))
+ .tag(commandId("setPublish1hzOutsideRangeCmd"))
.sink(jo -> {
Boolean b = new Boolean(getCommandValue(jo));
System.out.println("===== Changing isPublish1hzOutsideRange to "+b+" ======");
@@ -123,14 +123,15 @@ public class Sensor1 {
traceStream(raw1khz, "raw1khz");
// Wrap the raw sensor reading in a JsonObject for convenience.
- TStream<JsonObject> j1khz = JsonTuples.wrap(raw1khz, sensorId);
+ TStream<JsonObject> j1khz = JsonTuples.wrap(raw1khz, sensorId)
+ .tag("j1khz");
traceStream(j1khz, "j1khz");
// Data-reduction: reduce 1khz samples down to
// 1hz aggregate statistics samples.
- TStream<JsonObject> j1hzStats = JsonTuples.batch(j1khz, 1000,
- JsonTuples.statistics(MIN, MAX, MEAN, STDDEV))
- .tag("1hzStats");
+ TStream<JsonObject> j1hzStats = j1khz.last(1000, JsonTuples.keyFn())
+ .batch(JsonTuples.statistics(MIN, MAX, MEAN, STDDEV))
+ .tag("1hzStats");
// Create a 30 second sliding window of average trailing Mean values
// and enrich samples with that information.
@@ -145,7 +146,8 @@ public class Sensor1 {
jo.addProperty("AvgTrailingMean", Math.round(meanSum / samples.size()));
jo.addProperty("AvgTrailingMeanCnt", samples.size());
return jo;
- });
+ })
+ .tag("1hzStats.enriched");
traceStream(j1hzStats, "j1hzStats");
// Detect 1hz samples whose MEAN value are
@@ -154,7 +156,8 @@ public class Sensor1 {
sample -> {
int value = JsonTuples.getStatistic(sample, MEAN).getAsInt();
return !range.get().contains(value);
- }).tag("outside1hzMeanRange");
+ })
+ .tag("outside1hzMeanRange");
traceStream(outside1hzMeanRange, () -> "outside1hzMeanRange"+range.get());
// Log every outside1hzMeanRange event
@@ -166,8 +169,9 @@ public class Sensor1 {
// TODO enhance MqttDevice with configurable reliever.
app.mqttDevice().events(
PlumbingStreams.pressureReliever(
- outside1hzMeanRange.filter(tuple -> isPublish1hzOutsideRange.get()),
- tuple -> 0, 30),
+ outside1hzMeanRange.filter(tuple -> isPublish1hzOutsideRange.get())
+ .tag("outside1hzMeanRangeEvent.conditional"),
+ tuple -> 0, 30).tag("outside1hzMeanRangeEvent.pressureRelieved"),
app.sensorEventId(sensorId, "outside1hzMeanRangeEvent"), QoS.FIRE_AND_FORGET);
// Demonstrate periodic publishing of a sliding window if
@@ -200,7 +204,7 @@ public class Sensor1 {
List<JsonObject> lastN = Collections.synchronizedList(new ArrayList<>());
stream.last(count, JsonTuples.keyFn())
.aggregate((samples, key) -> samples)
- .tag("lastN")
+ .tag(event+".lastN")
.sink(samples -> {
// Capture the new list/window.
synchronized(lastN) {
@@ -212,8 +216,8 @@ public class Sensor1 {
// Publish the lastN (with trimmed down info) every nSec seconds
// if anything changed since the last publish.
TStream<JsonObject> periodicLastN =
- t.poll(() -> 1, nSec, TimeUnit.SECONDS)
- .filter(trigger -> !lastN.isEmpty())
+ t.poll(() -> 1, nSec, TimeUnit.SECONDS).tag(event+".trigger")
+ .filter(trigger -> !lastN.isEmpty()).tag(event+".changed")
.map(trigger -> {
synchronized(lastN) {
// create a single JsonObject with the list
@@ -237,15 +241,16 @@ public class Sensor1 {
return jo;
}
})
- .tag("periodicLastN");
+ .tag(event);
- traceStream(periodicLastN, "periodicLastN-"+event);
+ traceStream(periodicLastN, event);
// Use a pressureReliever to prevent backpressure if the broker
// can't be contacted.
// TODO enhance MqttDevice with configurable reliever.
app.mqttDevice().events(
- PlumbingStreams.pressureReliever(periodicLastN, tuple -> 0, 30),
+ PlumbingStreams.pressureReliever(periodicLastN, tuple -> 0, 30)
+ .tag(event+".pressureRelieved"),
app.sensorEventId(sensorId, event), QoS.FIRE_AND_FORGET);
}
[2/2] incubator-quarks git commit: Merge pull request #31
Posted by dl...@apache.org.
Merge pull request #31
This closes #31
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/cf1c70d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/cf1c70d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/cf1c70d1
Branch: refs/heads/master
Commit: cf1c70d1b825eade3821c0a00e3567fbe61bd4c8
Parents: 3f95e5d fb87f0a
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon Mar 21 16:34:24 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon Mar 21 16:34:24 2016 -0400
----------------------------------------------------------------------
.../quarks/connectors/mqtt/iot/MqttDevice.java | 3 +-
.../samples/apps/ApplicationUtilities.java | 4 +-
.../java/quarks/samples/apps/JsonTuples.java | 83 --------------------
.../samples/apps/sensorAnalytics/Sensor1.java | 37 +++++----
4 files changed, 25 insertions(+), 102 deletions(-)
----------------------------------------------------------------------