You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by ho...@apache.org on 2016/03/09 00:51:32 UTC
incubator-quarks git commit: updated ConsoleWaterDetector example,
one bug with index.js
Repository: incubator-quarks
Updated Branches:
refs/heads/update_console_sample [created] 4737c0510
updated ConsoleWaterDetector example, one bug with index.js
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/4737c051
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/4737c051
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/4737c051
Branch: refs/heads/update_console_sample
Commit: 4737c0510a52a1aa51c42e5c0bc7ee63aa0f97d8
Parents: 6ddf684
Author: Susan L. Cline <ho...@apache.org>
Authored: Tue Mar 8 15:47:04 2016 -0800
Committer: Susan L. Cline <ho...@apache.org>
Committed: Tue Mar 8 15:47:04 2016 -0800
----------------------------------------------------------------------
console/servlets/webapp_content/js/index.js | 1 -
.../samples/console/ConsoleWaterDetector.java | 95 ++++++++++++++------
2 files changed, 69 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4737c051/console/servlets/webapp_content/js/index.js
----------------------------------------------------------------------
diff --git a/console/servlets/webapp_content/js/index.js b/console/servlets/webapp_content/js/index.js
index 2b49332..3fe05ec 100644
--- a/console/servlets/webapp_content/js/index.js
+++ b/console/servlets/webapp_content/js/index.js
@@ -631,7 +631,6 @@ var renderGraph = function(jobId, counterMetrics, bIsNewJob) {
var targetStreams = [];
d.sourceLinks.forEach(function(src){
- targets.push(src.targetIdx.id.toString());
targets.push(src.targetIdx.id);
if (src.tags && src.tags.length > 0) {
targetStreams = src.tags;
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4737c051/samples/console/src/main/java/quarks/samples/console/ConsoleWaterDetector.java
----------------------------------------------------------------------
diff --git a/samples/console/src/main/java/quarks/samples/console/ConsoleWaterDetector.java b/samples/console/src/main/java/quarks/samples/console/ConsoleWaterDetector.java
index f9b4231..b39be3d 100644
--- a/samples/console/src/main/java/quarks/samples/console/ConsoleWaterDetector.java
+++ b/samples/console/src/main/java/quarks/samples/console/ConsoleWaterDetector.java
@@ -6,9 +6,14 @@ package quarks.samples.console;
import java.util.HashSet;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@@ -25,12 +30,11 @@ import quarks.topology.Topology;
*
* Demonstrates some of the features of the console.
* <P>
- * The topology graph in the console currently allows for 4 distinct "views" of the topology:
+ * The topology graph in the console currently allows for 3 distinct "views" of the topology:
* <ul>
* <li>Static flow</li>
* <li>Tuple count</li>
* <li>Oplet kind</li>
- * <li>Stream tags</li>
* </ul>
* </P>
* <P>
@@ -43,10 +47,6 @@ import quarks.topology.Topology;
* The "Oplet kind" view colors the oplets or vertices displayed in the topology graph (the circles) by their
* corresponding Oplet kind.
* </P>
- * <P>
- * The "Stream tags" view colors the "edges" or "streams" according to the selected stream tag. Stream tags must be
- * added by the user in order for this option to appear enabled in the console.
- * </P>
* If "Tuple count" is selected the legend reflects ranges of tuple counts measured since the application was started.
* </P>
* <P>
@@ -147,11 +147,11 @@ public class ConsoleWaterDetector {
System.out.println(dp.getServices().getService(HttpServer.class).getConsoleUrl());
- Topology simpleTopology = dp.newTopology("ConsoleWaterDetector");
+ Topology wellTopology = dp.newTopology("ConsoleWaterDetector");
- TStream<JsonObject> well1 = waterDetector(simpleTopology, 1);
- TStream<JsonObject> well2 = waterDetector(simpleTopology, 2);
- TStream<JsonObject> well3 = waterDetector(simpleTopology, 3);
+ TStream<JsonObject> well1 = waterDetector(wellTopology, 1);
+ TStream<JsonObject> well2 = waterDetector(wellTopology, 2);
+ TStream<JsonObject> well3 = waterDetector(wellTopology, 3);
TStream<JsonObject> filteredReadings1 = alertFilter(well1, 1, false);
TStream<JsonObject> filteredReadings2 = alertFilter(well2, 2, true);
@@ -168,21 +168,45 @@ public class ConsoleWaterDetector {
List<TStream<JsonObject>> individualAlerts2 = splitAlert(filteredReadings2, 2);
- individualAlerts2.get(0).tag("well2").sink(tuple -> System.out.println("\n" + formatAlertOutput(tuple, "2", "temp")));
- individualAlerts2.get(1).tag("well2").sink(tuple -> System.out.println(formatAlertOutput(tuple, "2", "acidity")));
- individualAlerts2.get(2).tag("well2").sink(tuple -> System.out.println(formatAlertOutput(tuple, "2", "ecoli")));
- individualAlerts2.get(3).tag("well2").sink(tuple -> System.out.println(formatAlertOutput(tuple, "2", "lead")));
+ TStream<JsonObject> alert0Well2 = individualAlerts2.get(0);
+ alert0Well2 = Metrics.counter(alert0Well2);
+ alert0Well2.tag("well2", "temp");
+
+ TStream<JsonObject> alert1Well2 = individualAlerts2.get(1);
+ alert1Well2 = Metrics.counter(alert1Well2);
+ alert1Well2.tag("well2", "acidity");
+
+ TStream<JsonObject> alert2Well2 = individualAlerts2.get(2);
+ alert2Well2 = Metrics.counter(alert2Well2);
+ alert2Well2.tag("well2", "ecoli");
+ TStream<JsonObject> alert3Well2 = individualAlerts2.get(3);
+ alert3Well2 = Metrics.counter(alert3Well2);
+ alert3Well2.tag("well2", "lead");
+
List<TStream<JsonObject>> individualAlerts3 = splitAlert(filteredReadings3, 3);
// Put a rate meter on well3's temperature sensor output
Metrics.rateMeter(individualAlerts3.get(0));
- individualAlerts3.get(0).tag(TEMP_ALERT_TAG, "well3").sink(tuple -> System.out.println("\n" + formatAlertOutput(tuple, "3", "temp")));
+ individualAlerts3.get(0).tag(TEMP_ALERT_TAG, "well3").sink(tuple -> System.out.println(formatAlertOutput(tuple, "3", "temp")));
individualAlerts3.get(1).tag(ACIDITY_ALERT_TAG, "well3").sink(tuple -> System.out.println(formatAlertOutput(tuple, "3", "acidity")));
individualAlerts3.get(2).tag(ECOLI_ALERT_TAG, "well3").sink(tuple -> System.out.println(formatAlertOutput(tuple, "3", "ecoli")));
individualAlerts3.get(3).tag(LEAD_ALERT_TAG, "well3").sink(tuple -> System.out.println(formatAlertOutput(tuple, "3", "lead")));
- dp.submit(simpleTopology);
+ dp.submit(wellTopology);
+
+ while (true) {
+ MetricRegistry metricRegistry = dp.getServices().getService(MetricRegistry.class);
+ SortedMap<String, Counter> counters = metricRegistry.getCounters();
+
+ Set<Entry<String, Counter>> values = counters.entrySet();
+ for (Entry<String, Counter> e : values) {
+ if (e.getValue().getCount() == 0) {
+ System.out.println("Counter Op:" + e.getKey() + " has a tuple count of zero!");
+ }
+ }
+ Thread.sleep(2000);
+ }
}
/**
@@ -301,28 +325,47 @@ public class ConsoleWaterDetector {
* @param alertStream The TStream<JsonObject> that we know has some out of range condition - it could be temp, acidity, ecoli or lead
* - or all of them
* @param wellId The id of the well that has the out of range readings
- * @return List<TStream<JsonObject>> - one for each sensor. Some of these readings may be in range since the incoming
- * stream is a composite of the readings
+ * @return List<TStream<JsonObject>> - one for each sensor.
*/
public static List<TStream<JsonObject>> splitAlert(TStream<JsonObject> alertStream, int wellId) {
List<TStream<JsonObject>> allStreams = alertStream.split(5, tuple -> {
if (tuple.get("temp") != null) {
JsonObject tempObj = new JsonObject();
- tempObj.addProperty("temp", tuple.get("temp").getAsInt());
- return 0;
+ int temp = tuple.get("temp").getAsInt();
+ if (temp <= TEMP_ALERT_MIN || temp >= TEMP_ALERT_MAX) {
+ tempObj.addProperty("temp", temp);
+ return 0;
+ } else {
+ return -1;
+ }
} else if (tuple.get("acidity") != null){
JsonObject acidObj = new JsonObject();
- acidObj.addProperty("acidity", tuple.get("acidity").getAsInt());
- return 1;
+ int acid = tuple.get("acidity").getAsInt();
+ if (acid <= ACIDITY_ALERT_MIN || acid >= ACIDITY_ALERT_MAX) {
+ acidObj.addProperty("acidity", acid);
+ return 1;
+ } else {
+ return -1;
+ }
} else if (tuple.get("ecoli") != null) {
JsonObject ecoliObj = new JsonObject();
- ecoliObj.addProperty("ecoli", tuple.get("ecoli").getAsInt());
- return 2;
+ int ecoli = tuple.get("ecoli").getAsInt();
+ if (ecoli >= ECOLI_ALERT) {
+ ecoliObj.addProperty("ecoli", ecoli);
+ return 2;
+ } else {
+ return -1;
+ }
} else if (tuple.get("lead") != null) {
JsonObject leadObj = new JsonObject();
- leadObj.addProperty("lead", tuple.get("lead").getAsInt());
- return 3;
+ int lead = tuple.get("lead").getAsInt();
+ if (lead >= LEAD_ALERT_MAX) {
+ leadObj.addProperty("lead", lead);
+ return 3;
+ } else {
+ return -1;
+ }
} else {
return -1;
}