You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/19 19:58:12 UTC
[3/5] incubator-streams git commit: Merge branch 'master' into
STREAMS-216
Merge branch 'master' into STREAMS-216
Conflicts:
streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9a77a8f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9a77a8f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9a77a8f7
Branch: refs/heads/STREAMS-216
Commit: 9a77a8f7c29ccffc8a71b1a4d7974e7904d98651
Parents: 966b12b dc7ba80
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Thu Nov 13 12:46:56 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Thu Nov 13 12:46:56 2014 -0600
----------------------------------------------------------------------
streams-components/streams-http/pom.xml | 2 +-
streams-monitoring/pom.xml | 29 +++++++
.../jackson/DatumStatusCounterDeserializer.java | 76 +++++++++++++++++
.../jackson/MemoryUsageDeserializer.java | 79 ++++++++++++++++++
.../jackson/StreamsTaskCounterDeserializer.java | 88 ++++++++++++++++++++
.../jackson/ThroughputQueueDeserializer.java | 87 +++++++++++++++++++
.../tasks/BroadcastMonitorThread.java | 30 ++++++-
.../org/apache/streams/pojo/json/Broadcast.json | 17 ++++
.../pojo/json/DatumStatusCounterBroadcast.json | 22 +++++
.../streams/pojo/json/MemoryUsageBroadcast.json | 30 +++++++
.../pojo/json/StreamsTaskCounterBroadcast.json | 38 +++++++++
.../pojo/json/ThroughputQueueBroadcast.json | 38 +++++++++
.../jackson/MemoryUsageDeserializerTest.java | 77 +++++++++++++++++
.../src/test/resources/MemoryUsageObjects.json | 1 +
streams-pojo/pom.xml | 10 ---
.../jackson/DatumStatusCounterDeserializer.java | 76 -----------------
.../jackson/MemoryUsageDeserializer.java | 79 ------------------
.../jackson/StreamsTaskCounterDeserializer.java | 88 --------------------
.../jackson/ThroughputQueueDeserializer.java | 87 -------------------
.../org/apache/streams/pojo/json/Broadcast.json | 17 ----
.../pojo/json/DatumStatusCounterBroadcast.json | 22 -----
.../streams/pojo/json/MemoryUsageBroadcast.json | 30 -------
.../pojo/json/StreamsTaskCounterBroadcast.json | 38 ---------
.../pojo/json/ThroughputQueueBroadcast.json | 38 ---------
.../jackson/MemoryUsageDeserializerTest.java | 77 -----------------
.../src/test/resources/MemoryUsageObjects.json | 1 -
.../local/builders/LocalStreamBuilder.java | 1 +
27 files changed, 612 insertions(+), 566 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a77a8f7/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --cc streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 40e4cfe,cbae7ee..25a5030
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@@ -55,8 -55,7 +56,9 @@@ public class BroadcastMonitorThread ext
server = ManagementFactory.getPlatformMBeanServer();
setBroadcastURI();
+ setStreamName();
+ setWaitTime();
+
messagePersister = new BroadcastMessagePersister(broadcastURI);
initializeObjectMapper();
@@@ -134,17 -132,30 +136,41 @@@
}
}
+ private void setStreamName() {
- if(streamConfig != null &&
++ if (streamConfig != null &&
+ streamConfig.containsKey("streamsID") &&
+ streamConfig.get("streamsID") != null &&
+ streamConfig.get("streamsID") instanceof String) {
+ streamName = streamConfig.get("streamsID").toString();
+ } else {
+ streamName = "{\"streamName\":\"Unknown Stream\"}";
+ }
+ }
+
+ /**
+ * Go through streams config and set the thread's wait time (if present)
+ */
+ private void setWaitTime() {
+ try {
+ if (streamConfig != null &&
+ streamConfig.containsKey("monitoring_broadcast_interval_ms") &&
+ streamConfig.get("monitoring_broadcast_interval_ms") != null &&
+ streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long ||
+ streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer) {
+ waitTime = Long.parseLong(streamConfig.get("monitoring_broadcast_interval_ms").toString());
+ } else {
+ waitTime = DEFAULT_WAIT_TIME;
+ }
+
+ //Shutdown
+ if(waitTime == -1) {
+ this.keepRunning = false;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while trying to set default broadcast thread wait time: {}", e);
+ }
+ }
+
public void shutdown() {
this.keepRunning = false;
LOGGER.debug("Shutting down BroadcastMonitor Thread");
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a77a8f7/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
----------------------------------------------------------------------
diff --cc streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
index 0000000,4d7f87b..94ec147
mode 000000,100644..100644
--- a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
@@@ -1,0 -1,13 +1,17 @@@
+ {
+ "type" : "object",
+ "title" : "object",
+ "javaType": "org.apache.streams.pojo.json.Broadcast",
+ "javaInterfaces": ["java.io.Serializable"],
+ "description" : "Base Broadcast class",
+ "properties" : {
+ "name": {
+ "type": "string",
+ "description": "Name of the MBean"
++ },
++ "streamIdentifier": {
++ "type": "string",
++ "description": "The name of the Stream that is currently executing"
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a77a8f7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --cc streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 26e1b27,856b815..e3e42ff
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@@ -49,7 -49,7 +49,8 @@@ public class LocalStreamBuilder impleme
public static final String TIMEOUT_KEY = "TIMEOUT";
public static final String BROADCAST_KEY = "broadcastURI";
+ public static final String STREAM_IDENTIFIER_KEY = "streamsID";
+ public static final String BROADCAST_INTERVAL_KEY = "monitoring_broadcast_interval_ms";
private Map<String, StreamComponent> providers;
private Map<String, StreamComponent> components;