You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/16 14:24:28 UTC

[1/3] incubator-quarks git commit: Add topology sample for combining streams processing results

Repository: incubator-quarks
Updated Branches:
  refs/heads/master 1228a4b01 -> 9d8338cf6


Add topology sample for combining streams processing results


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/399ea8d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/399ea8d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/399ea8d9

Branch: refs/heads/master
Commit: 399ea8d98031d9b012feed055537713b8294d3fd
Parents: 0fccf26
Author: Queenie Ma <qu...@gmail.com>
Authored: Tue Mar 15 15:10:31 2016 -0700
Committer: Queenie Ma <qu...@gmail.com>
Committed: Tue Mar 15 15:10:31 2016 -0700

----------------------------------------------------------------------
 .../CombiningStreamsProcessingResults.java      | 158 +++++++++++++++++++
 .../utils/sensor/HeartMonitorSensor.java        |  46 ++++++
 2 files changed, 204 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/399ea8d9/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
----------------------------------------------------------------------
diff --git a/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java b/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
new file mode 100644
index 0000000..a28965a
--- /dev/null
+++ b/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
@@ -0,0 +1,158 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2015,2016
+*/
+
+package quarks.samples.topology;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import quarks.console.server.HttpServer;
+import quarks.function.ToIntFunction;
+import quarks.providers.development.DevelopmentProvider;
+import quarks.providers.direct.DirectProvider;
+import quarks.samples.utils.sensor.HeartMonitorSensor;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * Applying different processing against a set of streams and combining the
+ * resulting streams into a single stream.
+ *
+ *  @see HeartMonitorSensor
+ */
+public class CombiningStreamsProcessingResults {
+    /**
+     * Polls a simulated heart monitor to periodically obtain blood pressure readings.
+     * Splits the readings by blood pressure category into separate streams.
+     * Applies different processing on each stream to generate alert streams.
+     * Combines the alert streams into a single stream and prints the alerts.
+     *
+     */
+    public static void main(String[] args) throws Exception {
+        HeartMonitorSensor monitor = new HeartMonitorSensor();
+
+        DirectProvider dp = new DevelopmentProvider();
+
+        System.out.println(dp.getServices().getService(HttpServer.class).getConsoleUrl());
+
+        Topology top = dp.newTopology("heartMonitor");
+
+        // Generate a stream of heart monitor readings
+        TStream<Map<String, Integer>> readings = top
+                .poll(monitor, 1, TimeUnit.MILLISECONDS)
+                .filter(tuple -> tuple.get("Systolic") > 50 && tuple.get("Diastolic") > 30)
+                .filter(tuple -> tuple.get("Systolic") < 200 && tuple.get("Diastolic") < 130);
+
+        // Split the stream by blood pressure category
+        List<TStream<Map<String, Integer>>> categories = readings.split(6, new ToIntFunction<Map<String, Integer>>() {
+            @Override
+            public int applyAsInt(Map<String, Integer> tuple) {
+                if (tuple.get("Systolic") < 120 && tuple.get("Diastolic") < 80) {
+                    // Normal
+                    return 0;
+                } else if ((tuple.get("Systolic") >= 120 && tuple.get("Systolic") <= 139) ||
+                           (tuple.get("Diastolic") >= 80 && tuple.get("Diastolic") <= 89)) {
+                    // Prehypertension
+                    return 1;
+                } else if ((tuple.get("Systolic") >= 140 && tuple.get("Systolic") <= 159) ||
+                           (tuple.get("Diastolic") >= 90 && tuple.get("Diastolic") <= 99)) {
+                    // High Blood Pressure (Hypertension) Stage 1
+                    return 2;
+                } else if ((tuple.get("Systolic") >= 160 && tuple.get("Systolic") <= 179) ||
+                           (tuple.get("Diastolic") >= 100 && tuple.get("Diastolic") <= 109)) {
+                    // High Blood Pressure (Hypertension) Stage 2
+                    return 3;
+                } else if (tuple.get("Systolic") >= 180 && tuple.get("Diastolic") >= 110)  {
+                    // Hypertensive Crisis
+                    return 4;
+                } else {
+                    // Invalid
+                    return -1;
+                }
+            }
+        });
+
+        // Get each individual stream
+        TStream<Map<String, Integer>> normal = categories.get(0).tag("normal");
+        TStream<Map<String, Integer>> prehypertension = categories.get(1).tag("prehypertension");
+        TStream<Map<String, Integer>> hypertension_stage1 = categories.get(2).tag("hypertension_stage1");
+        TStream<Map<String, Integer>> hypertension_stage2 = categories.get(3).tag("hypertension_stage2");
+        TStream<Map<String, Integer>> hypertensive = categories.get(4).tag("hypertensive");
+
+        // Perform analytics on each stream and generate alerts for each blood pressure category
+
+        // Category: Normal
+        TStream<String> normalAlerts = normal
+                .filter(tuple -> tuple.get("Systolic") > 80 && tuple.get("Diastolic") > 50)
+                .tag("normal")
+                .map(tuple -> {
+                    return "All is normal. BP is " + tuple.get("Systolic") + "/" +
+                            tuple.get("Diastolic") + ".\n"; })
+                .tag("normal");
+
+        // Category: Prehypertension category
+        TStream<String> prehypertensionAlerts = prehypertension
+                .map(tuple -> {
+                    return "At high risk for developing hypertension. BP is " +
+                            tuple.get("Systolic") + "/" + tuple.get("Diastolic") + ".\n"; })
+                .tag("prehypertension");
+
+        // Category: High Blood Pressure (Hypertension) Stage 1
+        TStream<String> hypertension_stage1Alerts = hypertension_stage1
+                .map(tuple -> {
+                    return "Monitor closely, patient has high blood pressure. " +
+                           "BP is " + tuple.get("Systolic") + "/" + tuple.get("Diastolic") + ".\n"; })
+                .tag("hypertension_stage1")
+                .modify(tuple -> "High Blood Pressure (Hypertension) Stage 1\n" + tuple)
+                .tag("hypertension_stage1");
+
+        // Category: High Blood Pressure (Hypertension) Stage 2
+        TStream<String> hypertension_stage2Alerts = hypertension_stage2
+                .filter(tuple -> tuple.get("Systolic") >= 170 && tuple.get("Diastolic") >= 105)
+                .tag("hypertension_stage2")
+                .peek(tuple ->
+                    System.out.println("BP: " + tuple.get("Systolic") + "/" + tuple.get("Diastolic")))
+                .map(tuple -> {
+                    return "Warning! Monitor closely, patient is at risk of a hypertensive crisis!\n"; })
+                .tag("hypertension_stage2")
+                .modify(tuple -> "High Blood Pressure (Hypertension) Stage 2\n" + tuple)
+                .tag("hypertension_stage2");
+
+        // Category: Hypertensive Crisis
+        TStream<String> hypertensiveAlerts = hypertensive
+                .filter(tuple -> tuple.get("Systolic") >= 180)
+                .tag("hypertensive")
+                .peek(tuple ->
+                    System.out.println("BP: " + tuple.get("Systolic") + "/" + tuple.get("Diastolic")))
+                .map(tuple -> { return "Emergency! See to patient immediately!\n"; })
+                .tag("hypertensive")
+                .modify(tuple -> tuple.toUpperCase())
+                .tag("hypertensive")
+                .modify(tuple -> "Hypertensive Crisis!!!\n" + tuple)
+                .tag("hypertensive");
+
+        // Union two streams to obtain a single stream containing alerts from the normal and
+        // prehypertension alert streams
+        TStream<String> normalAndPrehypertensionAlerts = normalAlerts.union(prehypertensionAlerts);
+
+        // Set of streams containing alerts from the other categories
+        Set<TStream<String>> otherAlerts = new HashSet<>();
+        otherAlerts.add(hypertension_stage1Alerts);
+        otherAlerts.add(hypertension_stage2Alerts);
+        otherAlerts.add(hypertensiveAlerts);
+
+        // Union a stream with a set of streams to obtain a single stream containing alerts from
+        // all alert streams
+        TStream<String> allAlerts = normalAndPrehypertensionAlerts.union(otherAlerts);
+
+        // Terminate the stream by printing out alerts from all categories
+        allAlerts.sink(tuple -> System.out.println(tuple));
+
+        dp.submit(top);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/399ea8d9/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
----------------------------------------------------------------------
diff --git a/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java b/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
new file mode 100644
index 0000000..421c3f6
--- /dev/null
+++ b/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
@@ -0,0 +1,46 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2015,2016
+*/
+
+package quarks.samples.utils.sensor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import quarks.function.Supplier;
+
+/**
+ * Streams of simulated heart monitor sensors.
+ *
+ */
+public class HeartMonitorSensor implements Supplier<Map<String,Integer>> {
+    // Initial blood pressure
+    public Integer currentSystolic = 115;
+    public Integer currentDiastolic = 75;
+    Random rand;
+
+    public HeartMonitorSensor() {
+        rand = new Random();
+    }
+
+    /**
+     * Every call to this method returns a map containing a random systolic
+     * pressure and a random diastolic pressure.
+     */
+    @Override
+    public Map<String, Integer> get() {
+        // Change the current pressure by some random amount between -2 and 2
+        Integer newSystolic = rand.nextInt(2 + 1 + 2) - 2 + currentSystolic;
+        currentSystolic = newSystolic;
+
+        Integer newDiastolic = rand.nextInt(2 + 1 + 2) - 2 + currentDiastolic;
+        currentDiastolic = newDiastolic;
+
+        Map<String, Integer> pressures = new HashMap<String, Integer>();
+        pressures.put("Systolic", currentSystolic);
+        pressures.put("Diastolic", currentDiastolic);
+        return pressures;
+    }
+}


[3/3] incubator-quarks git commit: Merge branch 'pr-14'

Posted by dj...@apache.org.
Merge branch 'pr-14'


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/9d8338cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/9d8338cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/9d8338cf

Branch: refs/heads/master
Commit: 9d8338cf6fda15ea5d05e92ad37e4b4a57e980d1
Parents: 1228a4b 240b430
Author: Daniel J. Debrunner <de...@us.ibm.com>
Authored: Wed Mar 16 06:23:42 2016 -0700
Committer: Daniel J. Debrunner <de...@us.ibm.com>
Committed: Wed Mar 16 06:23:42 2016 -0700

----------------------------------------------------------------------
 .../CombiningStreamsProcessingResults.java      | 171 +++++++++++++++++++
 .../utils/sensor/HeartMonitorSensor.java        |  59 +++++++
 2 files changed, 230 insertions(+)
----------------------------------------------------------------------



[2/3] incubator-quarks git commit: Add ASF license headers and add reasoning for a line of code

Posted by dj...@apache.org.
Add ASF license headers and add reasoning for a line of code


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/240b4308
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/240b4308
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/240b4308

Branch: refs/heads/master
Commit: 240b4308a56274ac22ac7c7f046ad4f2a94a29f0
Parents: 399ea8d
Author: Queenie Ma <qu...@gmail.com>
Authored: Tue Mar 15 16:01:39 2016 -0700
Committer: Queenie Ma <qu...@gmail.com>
Committed: Tue Mar 15 16:01:39 2016 -0700

----------------------------------------------------------------------
 .../CombiningStreamsProcessingResults.java      | 23 +++++++++++++++-----
 .../utils/sensor/HeartMonitorSensor.java        | 19 +++++++++++++---
 2 files changed, 34 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/240b4308/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
----------------------------------------------------------------------
diff --git a/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java b/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
index a28965a..6bcebe6 100644
--- a/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
+++ b/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
@@ -1,8 +1,21 @@
 /*
-# Licensed Materials - Property of IBM
-# Copyright IBM Corp. 2015,2016
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
 */
-
 package quarks.samples.topology;
 
 import java.util.HashSet;
@@ -136,8 +149,8 @@ public class CombiningStreamsProcessingResults {
                 .modify(tuple -> "Hypertensive Crisis!!!\n" + tuple)
                 .tag("hypertensive");
 
-        // Union two streams to obtain a single stream containing alerts from the normal and
-        // prehypertension alert streams
+        // Additional processing for these streams could go here. In this case, union two streams
+        // to obtain a single stream containing alerts from the normal and prehypertension alert streams.
         TStream<String> normalAndPrehypertensionAlerts = normalAlerts.union(prehypertensionAlerts);
 
         // Set of streams containing alerts from the other categories

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/240b4308/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
----------------------------------------------------------------------
diff --git a/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java b/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
index 421c3f6..9fa57d6 100644
--- a/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
+++ b/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
@@ -1,8 +1,21 @@
 /*
-# Licensed Materials - Property of IBM
-# Copyright IBM Corp. 2015,2016
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
 */
-
 package quarks.samples.utils.sensor;
 
 import java.util.HashMap;