You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/13 17:26:48 UTC

[3/3] incubator-quarks git commit: generate ISO8601 date/time and simplify addHeartBeat()

generate ISO8601 date/time and simplify addHeartBeat()


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/fd524657
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/fd524657
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/fd524657

Branch: refs/heads/master
Commit: fd5246572d854bb8030425c327abddfb6bb1d77e
Parents: 31968b6
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue Jul 12 14:25:17 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed Jul 13 11:23:15 2016 -0400

----------------------------------------------------------------------
 .../java/quarks/connectors/iot/HeartBeat.java   | 42 ++++++++------------
 .../samples/connectors/iotf/IotfSensors.java    |  6 ++-
 2 files changed, 20 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fd524657/connectors/iot/src/main/java/quarks/connectors/iot/HeartBeat.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/quarks/connectors/iot/HeartBeat.java b/connectors/iot/src/main/java/quarks/connectors/iot/HeartBeat.java
index 7fc7b70..0820472 100644
--- a/connectors/iot/src/main/java/quarks/connectors/iot/HeartBeat.java
+++ b/connectors/iot/src/main/java/quarks/connectors/iot/HeartBeat.java
@@ -18,12 +18,14 @@ under the License.
 */
 package quarks.connectors.iot;
 
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 
 import com.google.gson.JsonObject;
 
-import quarks.function.Consumer;
 import quarks.function.Functions;
 import quarks.topology.TStream;
 import quarks.topology.plumbing.PlumbingStreams;
@@ -52,35 +54,30 @@ public class HeartBeat {
    * The heart beat's event payload is the JSON for a JsonObject with the
    * heart beat's properties:
    * <ul>
-   * <li>"when" : (string) {@link Date#toString()}</li>
+   * <li>"when" : (string) ISO8601 UTC date/time. e.g. "2016-07-12T17:57:08Z"</li>
    * <li>"time" : (number) {@link System#currentTimeMillis()}</li>
    * </ul> 
    * 
-   * When {@code consumer} is non-null, {@code consumer.accept()} is called
-   * with the {@code TStream<JsonObject>} created for heart beat events.
-   * </P>
-   * 
    * @param iotDevice IoT hub device
    * @param period the heart beat period
    * @param unit TimeUnit for the period
    * @param eventId the IotDevice eventId to use for the event
-   * @param consumer 
+   * @return the {@code TStream<JsonObject>} heartbeat stream
    */
-  public static void addHeartBeat(IotDevice iotDevice, long period, TimeUnit unit, String eventId, Consumer<TStream<JsonObject>> consumer) {
+  public static TStream<JsonObject> addHeartBeat(IotDevice iotDevice, long period, TimeUnit unit, String eventId) {
+    DateFormat df = newIso8601Formatter();
     TStream<Date> hb = iotDevice.topology().poll(
         () -> new Date(),
         period, unit).tag("heartbeat");
     // Convert to JSON
     TStream<JsonObject> hbj = hb.map(date -> {
         JsonObject j = new  JsonObject();
-        j.addProperty("when", date.toString());
+        j.addProperty("when", df.format(date));
         j.addProperty("time", date.getTime());
         return j;
     }).tag("heartbeat");
     
-    if (consumer != null) {
-      consumer.accept(hbj);
-    }
+    TStream<JsonObject> hbs = hbj;
   
     // Tolerate connection outages.  Don't block upstream processing
     // and retain the most recent heartbeat if unable to publish.
@@ -88,22 +85,15 @@ public class HeartBeat {
                 Functions.unpartitioned(), 1).tag("pressureRelieved");
   
     iotDevice.events(hbj, eventId, QoS.FIRE_AND_FORGET);
+    
+    return hbs;
   }
   
-  /**
-   * Add IoT device heart beat processing to a topology.
-   * <P>
-   * Same as {@link #addHeartBeat(IotDevice, long, TimeUnit, String, Consumer)}
-   * with a null consumer parameter.
-   * </P>
-   * 
-   * @param iotDevice IoT hub device
-   * @param period the heart beat period
-   * @param unit TimeUnit for the period
-   * @param eventId the IotDevice eventId to use for the event
-   */
-  public static void addHeartBeat(IotDevice iotDevice, long period, TimeUnit unit, String eventId) {
-    addHeartBeat(iotDevice, period, unit, eventId, null);
+  private static DateFormat newIso8601Formatter() {
+    // Quoted "Z" to indicate UTC, no timezone offset
+    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+    df.setTimeZone(TimeZone.getTimeZone("UTC"));
+    return df;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fd524657/samples/connectors/src/main/java/quarks/samples/connectors/iotf/IotfSensors.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/quarks/samples/connectors/iotf/IotfSensors.java b/samples/connectors/src/main/java/quarks/samples/connectors/iotf/IotfSensors.java
index 352766d..f302cd9 100644
--- a/samples/connectors/src/main/java/quarks/samples/connectors/iotf/IotfSensors.java
+++ b/samples/connectors/src/main/java/quarks/samples/connectors/iotf/IotfSensors.java
@@ -119,8 +119,10 @@ public class IotfSensors {
      * @param print true to print generated heartbeat tuples to System.out.
      */
     public static void heartBeat(IotDevice device, boolean print) {
-      HeartBeat.addHeartBeat(device, 1, TimeUnit.MINUTES,
-          "heartbeat", stream -> { if (print) stream.print(); });
+      TStream<JsonObject> hbs = 
+          HeartBeat.addHeartBeat(device, 1, TimeUnit.MINUTES, "heartbeat");
+      if (print)
+        hbs.print();
     }