You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/16 14:24:28 UTC
[1/3] incubator-quarks git commit: Add topology sample for combining
streams processing results
Repository: incubator-quarks
Updated Branches:
refs/heads/master 1228a4b01 -> 9d8338cf6
Add topology sample for combining streams processing results
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/399ea8d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/399ea8d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/399ea8d9
Branch: refs/heads/master
Commit: 399ea8d98031d9b012feed055537713b8294d3fd
Parents: 0fccf26
Author: Queenie Ma <qu...@gmail.com>
Authored: Tue Mar 15 15:10:31 2016 -0700
Committer: Queenie Ma <qu...@gmail.com>
Committed: Tue Mar 15 15:10:31 2016 -0700
----------------------------------------------------------------------
.../CombiningStreamsProcessingResults.java | 158 +++++++++++++++++++
.../utils/sensor/HeartMonitorSensor.java | 46 ++++++
2 files changed, 204 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/399ea8d9/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
----------------------------------------------------------------------
diff --git a/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java b/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
new file mode 100644
index 0000000..a28965a
--- /dev/null
+++ b/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
@@ -0,0 +1,158 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2015,2016
+*/
+
+package quarks.samples.topology;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import quarks.console.server.HttpServer;
+import quarks.function.ToIntFunction;
+import quarks.providers.development.DevelopmentProvider;
+import quarks.providers.direct.DirectProvider;
+import quarks.samples.utils.sensor.HeartMonitorSensor;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * Applying different processing against a set of streams and combining the
+ * resulting streams into a single stream.
+ *
+ * @see HeartMonitorSensor
+ */
+public class CombiningStreamsProcessingResults {
+ /**
+ * Polls a simulated heart monitor to periodically obtain blood pressure readings.
+ * Splits the readings by blood pressure category into separate streams.
+ * Applies different processing on each stream to generate alert streams.
+ * Combines the alert streams into a single stream and prints the alerts.
+ *
+ */
+ public static void main(String[] args) throws Exception {
+ HeartMonitorSensor monitor = new HeartMonitorSensor();
+
+ DirectProvider dp = new DevelopmentProvider();
+
+ System.out.println(dp.getServices().getService(HttpServer.class).getConsoleUrl());
+
+ Topology top = dp.newTopology("heartMonitor");
+
+ // Generate a stream of heart monitor readings
+ TStream<Map<String, Integer>> readings = top
+ .poll(monitor, 1, TimeUnit.MILLISECONDS)
+ .filter(tuple -> tuple.get("Systolic") > 50 && tuple.get("Diastolic") > 30)
+ .filter(tuple -> tuple.get("Systolic") < 200 && tuple.get("Diastolic") < 130);
+
+ // Split the stream by blood pressure category
+ List<TStream<Map<String, Integer>>> categories = readings.split(6, new ToIntFunction<Map<String, Integer>>() {
+ @Override
+ public int applyAsInt(Map<String, Integer> tuple) {
+ if (tuple.get("Systolic") < 120 && tuple.get("Diastolic") < 80) {
+ // Normal
+ return 0;
+ } else if ((tuple.get("Systolic") >= 120 && tuple.get("Systolic") <= 139) ||
+ (tuple.get("Diastolic") >= 80 && tuple.get("Diastolic") <= 89)) {
+ // Prehypertension
+ return 1;
+ } else if ((tuple.get("Systolic") >= 140 && tuple.get("Systolic") <= 159) ||
+ (tuple.get("Diastolic") >= 90 && tuple.get("Diastolic") <= 99)) {
+ // High Blood Pressure (Hypertension) Stage 1
+ return 2;
+ } else if ((tuple.get("Systolic") >= 160 && tuple.get("Systolic") <= 179) ||
+ (tuple.get("Diastolic") >= 100 && tuple.get("Diastolic") <= 109)) {
+ // High Blood Pressure (Hypertension) Stage 2
+ return 3;
+ } else if (tuple.get("Systolic") >= 180 && tuple.get("Diastolic") >= 110) {
+ // Hypertensive Crisis
+ return 4;
+ } else {
+ // Invalid
+ return -1;
+ }
+ }
+ });
+
+ // Get each individual stream
+ TStream<Map<String, Integer>> normal = categories.get(0).tag("normal");
+ TStream<Map<String, Integer>> prehypertension = categories.get(1).tag("prehypertension");
+ TStream<Map<String, Integer>> hypertension_stage1 = categories.get(2).tag("hypertension_stage1");
+ TStream<Map<String, Integer>> hypertension_stage2 = categories.get(3).tag("hypertension_stage2");
+ TStream<Map<String, Integer>> hypertensive = categories.get(4).tag("hypertensive");
+
+ // Perform analytics on each stream and generate alerts for each blood pressure category
+
+ // Category: Normal
+ TStream<String> normalAlerts = normal
+ .filter(tuple -> tuple.get("Systolic") > 80 && tuple.get("Diastolic") > 50)
+ .tag("normal")
+ .map(tuple -> {
+ return "All is normal. BP is " + tuple.get("Systolic") + "/" +
+ tuple.get("Diastolic") + ".\n"; })
+ .tag("normal");
+
+ // Category: Prehypertension category
+ TStream<String> prehypertensionAlerts = prehypertension
+ .map(tuple -> {
+ return "At high risk for developing hypertension. BP is " +
+ tuple.get("Systolic") + "/" + tuple.get("Diastolic") + ".\n"; })
+ .tag("prehypertension");
+
+ // Category: High Blood Pressure (Hypertension) Stage 1
+ TStream<String> hypertension_stage1Alerts = hypertension_stage1
+ .map(tuple -> {
+ return "Monitor closely, patient has high blood pressure. " +
+ "BP is " + tuple.get("Systolic") + "/" + tuple.get("Diastolic") + ".\n"; })
+ .tag("hypertension_stage1")
+ .modify(tuple -> "High Blood Pressure (Hypertension) Stage 1\n" + tuple)
+ .tag("hypertension_stage1");
+
+ // Category: High Blood Pressure (Hypertension) Stage 2
+ TStream<String> hypertension_stage2Alerts = hypertension_stage2
+ .filter(tuple -> tuple.get("Systolic") >= 170 && tuple.get("Diastolic") >= 105)
+ .tag("hypertension_stage2")
+ .peek(tuple ->
+ System.out.println("BP: " + tuple.get("Systolic") + "/" + tuple.get("Diastolic")))
+ .map(tuple -> {
+ return "Warning! Monitor closely, patient is at risk of a hypertensive crisis!\n"; })
+ .tag("hypertension_stage2")
+ .modify(tuple -> "High Blood Pressure (Hypertension) Stage 2\n" + tuple)
+ .tag("hypertension_stage2");
+
+ // Category: Hypertensive Crisis
+ TStream<String> hypertensiveAlerts = hypertensive
+ .filter(tuple -> tuple.get("Systolic") >= 180)
+ .tag("hypertensive")
+ .peek(tuple ->
+ System.out.println("BP: " + tuple.get("Systolic") + "/" + tuple.get("Diastolic")))
+ .map(tuple -> { return "Emergency! See to patient immediately!\n"; })
+ .tag("hypertensive")
+ .modify(tuple -> tuple.toUpperCase())
+ .tag("hypertensive")
+ .modify(tuple -> "Hypertensive Crisis!!!\n" + tuple)
+ .tag("hypertensive");
+
+ // Union two streams to obtain a single stream containing alerts from the normal and
+ // prehypertension alert streams
+ TStream<String> normalAndPrehypertensionAlerts = normalAlerts.union(prehypertensionAlerts);
+
+ // Set of streams containing alerts from the other categories
+ Set<TStream<String>> otherAlerts = new HashSet<>();
+ otherAlerts.add(hypertension_stage1Alerts);
+ otherAlerts.add(hypertension_stage2Alerts);
+ otherAlerts.add(hypertensiveAlerts);
+
+ // Union a stream with a set of streams to obtain a single stream containing alerts from
+ // all alert streams
+ TStream<String> allAlerts = normalAndPrehypertensionAlerts.union(otherAlerts);
+
+ // Terminate the stream by printing out alerts from all categories
+ allAlerts.sink(tuple -> System.out.println(tuple));
+
+ dp.submit(top);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/399ea8d9/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
----------------------------------------------------------------------
diff --git a/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java b/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
new file mode 100644
index 0000000..421c3f6
--- /dev/null
+++ b/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
@@ -0,0 +1,46 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2015,2016
+*/
+
+package quarks.samples.utils.sensor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import quarks.function.Supplier;
+
+/**
+ * Streams of simulated heart monitor sensors.
+ *
+ */
+public class HeartMonitorSensor implements Supplier<Map<String,Integer>> {
+ // Initial blood pressure
+ public Integer currentSystolic = 115;
+ public Integer currentDiastolic = 75;
+ Random rand;
+
+ public HeartMonitorSensor() {
+ rand = new Random();
+ }
+
+ /**
+ * Every call to this method returns a map containing a random systolic
+ * pressure and a random diastolic pressure.
+ */
+ @Override
+ public Map<String, Integer> get() {
+ // Change the current pressure by some random amount between -2 and 2
+ Integer newSystolic = rand.nextInt(2 + 1 + 2) - 2 + currentSystolic;
+ currentSystolic = newSystolic;
+
+ Integer newDiastolic = rand.nextInt(2 + 1 + 2) - 2 + currentDiastolic;
+ currentDiastolic = newDiastolic;
+
+ Map<String, Integer> pressures = new HashMap<String, Integer>();
+ pressures.put("Systolic", currentSystolic);
+ pressures.put("Diastolic", currentDiastolic);
+ return pressures;
+ }
+}
[3/3] incubator-quarks git commit: Merge branch 'pr-14'
Posted by dj...@apache.org.
Merge branch 'pr-14'
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/9d8338cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/9d8338cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/9d8338cf
Branch: refs/heads/master
Commit: 9d8338cf6fda15ea5d05e92ad37e4b4a57e980d1
Parents: 1228a4b 240b430
Author: Daniel J. Debrunner <de...@us.ibm.com>
Authored: Wed Mar 16 06:23:42 2016 -0700
Committer: Daniel J. Debrunner <de...@us.ibm.com>
Committed: Wed Mar 16 06:23:42 2016 -0700
----------------------------------------------------------------------
.../CombiningStreamsProcessingResults.java | 171 +++++++++++++++++++
.../utils/sensor/HeartMonitorSensor.java | 59 +++++++
2 files changed, 230 insertions(+)
----------------------------------------------------------------------
[2/3] incubator-quarks git commit: Add ASF license headers and add
reasoning for a line of code
Posted by dj...@apache.org.
Add ASF license headers and add reasoning for a line of code
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/240b4308
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/240b4308
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/240b4308
Branch: refs/heads/master
Commit: 240b4308a56274ac22ac7c7f046ad4f2a94a29f0
Parents: 399ea8d
Author: Queenie Ma <qu...@gmail.com>
Authored: Tue Mar 15 16:01:39 2016 -0700
Committer: Queenie Ma <qu...@gmail.com>
Committed: Tue Mar 15 16:01:39 2016 -0700
----------------------------------------------------------------------
.../CombiningStreamsProcessingResults.java | 23 +++++++++++++++-----
.../utils/sensor/HeartMonitorSensor.java | 19 +++++++++++++---
2 files changed, 34 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/240b4308/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
----------------------------------------------------------------------
diff --git a/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java b/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
index a28965a..6bcebe6 100644
--- a/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
+++ b/samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
@@ -1,8 +1,21 @@
/*
-# Licensed Materials - Property of IBM
-# Copyright IBM Corp. 2015,2016
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
*/
-
package quarks.samples.topology;
import java.util.HashSet;
@@ -136,8 +149,8 @@ public class CombiningStreamsProcessingResults {
.modify(tuple -> "Hypertensive Crisis!!!\n" + tuple)
.tag("hypertensive");
- // Union two streams to obtain a single stream containing alerts from the normal and
- // prehypertension alert streams
+ // Additional processing for these streams could go here. In this case, union two streams
+ // to obtain a single stream containing alerts from the normal and prehypertension alert streams.
TStream<String> normalAndPrehypertensionAlerts = normalAlerts.union(prehypertensionAlerts);
// Set of streams containing alerts from the other categories
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/240b4308/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
----------------------------------------------------------------------
diff --git a/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java b/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
index 421c3f6..9fa57d6 100644
--- a/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
+++ b/samples/utils/src/main/java/quarks/samples/utils/sensor/HeartMonitorSensor.java
@@ -1,8 +1,21 @@
/*
-# Licensed Materials - Property of IBM
-# Copyright IBM Corp. 2015,2016
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
*/
-
package quarks.samples.utils.sensor;
import java.util.HashMap;