You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/13 17:26:48 UTC
[3/3] incubator-quarks git commit: generate ISO8601 date/time and
simplify addHeartBeat()
generate ISO8601 date/time and simplify addHeartBeat()
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/fd524657
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/fd524657
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/fd524657
Branch: refs/heads/master
Commit: fd5246572d854bb8030425c327abddfb6bb1d77e
Parents: 31968b6
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue Jul 12 14:25:17 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed Jul 13 11:23:15 2016 -0400
----------------------------------------------------------------------
.../java/quarks/connectors/iot/HeartBeat.java | 42 ++++++++------------
.../samples/connectors/iotf/IotfSensors.java | 6 ++-
2 files changed, 20 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fd524657/connectors/iot/src/main/java/quarks/connectors/iot/HeartBeat.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/quarks/connectors/iot/HeartBeat.java b/connectors/iot/src/main/java/quarks/connectors/iot/HeartBeat.java
index 7fc7b70..0820472 100644
--- a/connectors/iot/src/main/java/quarks/connectors/iot/HeartBeat.java
+++ b/connectors/iot/src/main/java/quarks/connectors/iot/HeartBeat.java
@@ -18,12 +18,14 @@ under the License.
*/
package quarks.connectors.iot;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import com.google.gson.JsonObject;
-import quarks.function.Consumer;
import quarks.function.Functions;
import quarks.topology.TStream;
import quarks.topology.plumbing.PlumbingStreams;
@@ -52,35 +54,30 @@ public class HeartBeat {
* The heart beat's event payload is the JSON for a JsonObject with the
* heart beat's properties:
* <ul>
- * <li>"when" : (string) {@link Date#toString()}</li>
+ * <li>"when" : (string) ISO8601 UTC date/time. e.g. "2016-07-12T17:57:08Z"</li>
* <li>"time" : (number) {@link System#currentTimeMillis()}</li>
* </ul>
*
- * When {@code consumer} is non-null, {@code consumer.accept()} is called
- * with the {@code TStream<JsonObject>} created for heart beat events.
- * </P>
- *
* @param iotDevice IoT hub device
* @param period the heart beat period
* @param unit TimeUnit for the period
* @param eventId the IotDevice eventId to use for the event
- * @param consumer
+ * @return the {@code TStream<JsonObject>} heartbeat stream
*/
- public static void addHeartBeat(IotDevice iotDevice, long period, TimeUnit unit, String eventId, Consumer<TStream<JsonObject>> consumer) {
+ public static TStream<JsonObject> addHeartBeat(IotDevice iotDevice, long period, TimeUnit unit, String eventId) {
+ DateFormat df = newIso8601Formatter();
TStream<Date> hb = iotDevice.topology().poll(
() -> new Date(),
period, unit).tag("heartbeat");
// Convert to JSON
TStream<JsonObject> hbj = hb.map(date -> {
JsonObject j = new JsonObject();
- j.addProperty("when", date.toString());
+ j.addProperty("when", df.format(date));
j.addProperty("time", date.getTime());
return j;
}).tag("heartbeat");
- if (consumer != null) {
- consumer.accept(hbj);
- }
+ TStream<JsonObject> hbs = hbj;
// Tolerate connection outages. Don't block upstream processing
// and retain the most recent heartbeat if unable to publish.
@@ -88,22 +85,15 @@ public class HeartBeat {
Functions.unpartitioned(), 1).tag("pressureRelieved");
iotDevice.events(hbj, eventId, QoS.FIRE_AND_FORGET);
+
+ return hbs;
}
- /**
- * Add IoT device heart beat processing to a topology.
- * <P>
- * Same as {@link #addHeartBeat(IotDevice, long, TimeUnit, String, Consumer)}
- * with a null consumer parameter.
- * </P>
- *
- * @param iotDevice IoT hub device
- * @param period the heart beat period
- * @param unit TimeUnit for the period
- * @param eventId the IotDevice eventId to use for the event
- */
- public static void addHeartBeat(IotDevice iotDevice, long period, TimeUnit unit, String eventId) {
- addHeartBeat(iotDevice, period, unit, eventId, null);
+ private static DateFormat newIso8601Formatter() {
+ // Quoted "Z" to indicate UTC, no timezone offset
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return df;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fd524657/samples/connectors/src/main/java/quarks/samples/connectors/iotf/IotfSensors.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/quarks/samples/connectors/iotf/IotfSensors.java b/samples/connectors/src/main/java/quarks/samples/connectors/iotf/IotfSensors.java
index 352766d..f302cd9 100644
--- a/samples/connectors/src/main/java/quarks/samples/connectors/iotf/IotfSensors.java
+++ b/samples/connectors/src/main/java/quarks/samples/connectors/iotf/IotfSensors.java
@@ -119,8 +119,10 @@ public class IotfSensors {
* @param print true to print generated heartbeat tuples to System.out.
*/
public static void heartBeat(IotDevice device, boolean print) {
- HeartBeat.addHeartBeat(device, 1, TimeUnit.MINUTES,
- "heartbeat", stream -> { if (print) stream.print(); });
+ TStream<JsonObject> hbs =
+ HeartBeat.addHeartBeat(device, 1, TimeUnit.MINUTES, "heartbeat");
+ if (print)
+ hbs.print();
}