You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/13 06:24:27 UTC

[5/6] incubator-streams git commit: STREAMS-211 | Responded to code review feedback

STREAMS-211 | Responded to code review feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bf43cff4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bf43cff4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bf43cff4

Branch: refs/heads/master
Commit: bf43cff445ebd59d0b52da7d1f56dc5fdddb8d7c
Parents: 4b332d7
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Wed Nov 12 18:06:59 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Wed Nov 12 18:06:59 2014 -0600

----------------------------------------------------------------------
 streams-components/streams-http/pom.xml         |  2 +-
 streams-monitoring/pom.xml                      | 29 ++++++++++++++++++++
 .../tasks/BroadcastMonitorThread.java           | 28 ++++++++++++++++++-
 streams-pojo/pom.xml                            |  5 ----
 .../local/builders/LocalStreamBuilder.java      |  1 +
 5 files changed, 58 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf43cff4/streams-components/streams-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/pom.xml b/streams-components/streams-http/pom.xml
index 39a4faa..f549729 100644
--- a/streams-components/streams-http/pom.xml
+++ b/streams-components/streams-http/pom.xml
@@ -68,7 +68,7 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
-            <version>4.3.5</version>
+            <version>${httpcomponents.client.version}</version>
         </dependency>
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf43cff4/streams-monitoring/pom.xml
----------------------------------------------------------------------
diff --git a/streams-monitoring/pom.xml b/streams-monitoring/pom.xml
index 51b5f16..028b173 100644
--- a/streams-monitoring/pom.xml
+++ b/streams-monitoring/pom.xml
@@ -63,6 +63,7 @@
         </dependency>
     </dependencies>
 
+
     <build>
         <sourceDirectory>src/main/java</sourceDirectory>
         <testSourceDirectory>src/test/java</testSourceDirectory>
@@ -76,5 +77,33 @@
                 <directory>src/test/resources</directory>
             </testResource>
         </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.pojo.json</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf43cff4/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 6e07619..cbae7ee 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -42,6 +42,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
     private static MBeanServer server;
 
     private long DEFAULT_WAIT_TIME = 30000;
+    private long waitTime;
     private ObjectMapper objectMapper;
     private Map<String, Object> streamConfig;
     private String broadcastURI = null;
@@ -54,6 +55,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
         server = ManagementFactory.getPlatformMBeanServer();
 
         setBroadcastURI();
+        setWaitTime();
         messagePersister = new BroadcastMessagePersister(broadcastURI);
 
         initializeObjectMapper();
@@ -109,7 +111,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
                 }
 
                 messagePersister.persistMessages(messages);
-                Thread.sleep(DEFAULT_WAIT_TIME);
+                Thread.sleep(waitTime);
             } catch (InterruptedException e) {
                 LOGGER.error("Interrupted!: {}", e);
             } catch (Exception e) {
@@ -130,6 +132,30 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
         }
     }
 
+    /**
+     * Go through streams config and set the thread's wait time (if present)
+     */
+    private void setWaitTime() {
+        try {
+            if (streamConfig != null &&
+                    streamConfig.containsKey("monitoring_broadcast_interval_ms") &&
+                    streamConfig.get("monitoring_broadcast_interval_ms") != null &&
+                    streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long ||
+                    streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer) {
+                waitTime = Long.parseLong(streamConfig.get("monitoring_broadcast_interval_ms").toString());
+            } else {
+                waitTime = DEFAULT_WAIT_TIME;
+            }
+
+            //Shutdown
+            if(waitTime == -1) {
+                this.keepRunning = false;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Exception while trying to set default broadcast thread wait time: {}", e);
+        }
+    }
+
     public void shutdown() {
         this.keepRunning = false;
         LOGGER.debug("Shutting down BroadcastMonitor Thread");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf43cff4/streams-pojo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo/pom.xml b/streams-pojo/pom.xml
index 0b42b3f..57b82fa 100644
--- a/streams-pojo/pom.xml
+++ b/streams-pojo/pom.xml
@@ -189,11 +189,6 @@
                         <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/verbs/share.json</sourcePath>
                         <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/verbs/tag.json</sourcePath>
                         <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/verbs/update.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
                     <targetPackage>org.apache.streams.pojo.json</targetPackage>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf43cff4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 47cb08f..856b815 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -49,6 +49,7 @@ public class LocalStreamBuilder implements StreamBuilder {
 
     public static final String TIMEOUT_KEY = "TIMEOUT";
     public static final String BROADCAST_KEY = "broadcastURI";
+    public static final String BROADCAST_INTERVAL_KEY = "monitoring_broadcast_interval_ms";
 
     private Map<String, StreamComponent> providers;
     private Map<String, StreamComponent> components;