You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/19 19:58:12 UTC

[3/5] incubator-streams git commit: Merge branch 'master' into STREAMS-216

Merge branch 'master' into STREAMS-216

Conflicts:
	streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
	streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9a77a8f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9a77a8f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9a77a8f7

Branch: refs/heads/STREAMS-216
Commit: 9a77a8f7c29ccffc8a71b1a4d7974e7904d98651
Parents: 966b12b dc7ba80
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Thu Nov 13 12:46:56 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Thu Nov 13 12:46:56 2014 -0600

----------------------------------------------------------------------
 streams-components/streams-http/pom.xml         |  2 +-
 streams-monitoring/pom.xml                      | 29 +++++++
 .../jackson/DatumStatusCounterDeserializer.java | 76 +++++++++++++++++
 .../jackson/MemoryUsageDeserializer.java        | 79 ++++++++++++++++++
 .../jackson/StreamsTaskCounterDeserializer.java | 88 ++++++++++++++++++++
 .../jackson/ThroughputQueueDeserializer.java    | 87 +++++++++++++++++++
 .../tasks/BroadcastMonitorThread.java           | 30 ++++++-
 .../org/apache/streams/pojo/json/Broadcast.json | 17 ++++
 .../pojo/json/DatumStatusCounterBroadcast.json  | 22 +++++
 .../streams/pojo/json/MemoryUsageBroadcast.json | 30 +++++++
 .../pojo/json/StreamsTaskCounterBroadcast.json  | 38 +++++++++
 .../pojo/json/ThroughputQueueBroadcast.json     | 38 +++++++++
 .../jackson/MemoryUsageDeserializerTest.java    | 77 +++++++++++++++++
 .../src/test/resources/MemoryUsageObjects.json  |  1 +
 streams-pojo/pom.xml                            | 10 ---
 .../jackson/DatumStatusCounterDeserializer.java | 76 -----------------
 .../jackson/MemoryUsageDeserializer.java        | 79 ------------------
 .../jackson/StreamsTaskCounterDeserializer.java | 88 --------------------
 .../jackson/ThroughputQueueDeserializer.java    | 87 -------------------
 .../org/apache/streams/pojo/json/Broadcast.json | 17 ----
 .../pojo/json/DatumStatusCounterBroadcast.json  | 22 -----
 .../streams/pojo/json/MemoryUsageBroadcast.json | 30 -------
 .../pojo/json/StreamsTaskCounterBroadcast.json  | 38 ---------
 .../pojo/json/ThroughputQueueBroadcast.json     | 38 ---------
 .../jackson/MemoryUsageDeserializerTest.java    | 77 -----------------
 .../src/test/resources/MemoryUsageObjects.json  |  1 -
 .../local/builders/LocalStreamBuilder.java      |  1 +
 27 files changed, 612 insertions(+), 566 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a77a8f7/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --cc streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 40e4cfe,cbae7ee..25a5030
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@@ -55,8 -55,7 +56,9 @@@ public class BroadcastMonitorThread ext
          server = ManagementFactory.getPlatformMBeanServer();
  
          setBroadcastURI();
 +        setStreamName();
+         setWaitTime();
 +
          messagePersister = new BroadcastMessagePersister(broadcastURI);
  
          initializeObjectMapper();
@@@ -134,17 -132,30 +136,41 @@@
          }
      }
  
 +    private void setStreamName() {
-         if(streamConfig != null &&
++        if (streamConfig != null &&
 +                streamConfig.containsKey("streamsID") &&
 +                streamConfig.get("streamsID") != null &&
 +                streamConfig.get("streamsID") instanceof String) {
 +            streamName = streamConfig.get("streamsID").toString();
 +        } else {
 +            streamName = "{\"streamName\":\"Unknown Stream\"}";
 +        }
 +    }
 +
+     /**
+      * Go through streams config and set the thread's wait time (if present)
+      */
+     private void setWaitTime() {
+         try {
+             if (streamConfig != null &&
+                     streamConfig.containsKey("monitoring_broadcast_interval_ms") &&
+                     streamConfig.get("monitoring_broadcast_interval_ms") != null &&
+                     streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long ||
+                     streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer) {
+                 waitTime = Long.parseLong(streamConfig.get("monitoring_broadcast_interval_ms").toString());
+             } else {
+                 waitTime = DEFAULT_WAIT_TIME;
+             }
+ 
+             //Shutdown
+             if(waitTime == -1) {
+                 this.keepRunning = false;
+             }
+         } catch (Exception e) {
+             LOGGER.error("Exception while trying to set default broadcast thread wait time: {}", e);
+         }
+     }
+ 
      public void shutdown() {
          this.keepRunning = false;
          LOGGER.debug("Shutting down BroadcastMonitor Thread");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a77a8f7/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
----------------------------------------------------------------------
diff --cc streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
index 0000000,4d7f87b..94ec147
mode 000000,100644..100644
--- a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
@@@ -1,0 -1,13 +1,17 @@@
+ {
+   "type" : "object",
+   "title" : "object",
+   "javaType": "org.apache.streams.pojo.json.Broadcast",
+   "javaInterfaces": ["java.io.Serializable"],
+   "description" : "Base Broadcast class",
+   "properties" : {
+     "name": {
+       "type": "string",
+       "description": "Name of the MBean"
++    },
++    "streamIdentifier": {
++      "type": "string",
++      "description": "The name of the Stream that is currently executing"
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a77a8f7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --cc streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 26e1b27,856b815..e3e42ff
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@@ -49,7 -49,7 +49,8 @@@ public class LocalStreamBuilder impleme
  
      public static final String TIMEOUT_KEY = "TIMEOUT";
      public static final String BROADCAST_KEY = "broadcastURI";
 +    public static final String STREAM_IDENTIFIER_KEY = "streamsID";
+     public static final String BROADCAST_INTERVAL_KEY = "monitoring_broadcast_interval_ms";
  
      private Map<String, StreamComponent> providers;
      private Map<String, StreamComponent> components;