You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/13 06:24:27 UTC
[5/6] incubator-streams git commit: STREAMS-211 | Responded to code
review feedback
STREAMS-211 | Responded to code review feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bf43cff4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bf43cff4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bf43cff4
Branch: refs/heads/master
Commit: bf43cff445ebd59d0b52da7d1f56dc5fdddb8d7c
Parents: 4b332d7
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Wed Nov 12 18:06:59 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Wed Nov 12 18:06:59 2014 -0600
----------------------------------------------------------------------
streams-components/streams-http/pom.xml | 2 +-
streams-monitoring/pom.xml | 29 ++++++++++++++++++++
.../tasks/BroadcastMonitorThread.java | 28 ++++++++++++++++++-
streams-pojo/pom.xml | 5 ----
.../local/builders/LocalStreamBuilder.java | 1 +
5 files changed, 58 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf43cff4/streams-components/streams-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/pom.xml b/streams-components/streams-http/pom.xml
index 39a4faa..f549729 100644
--- a/streams-components/streams-http/pom.xml
+++ b/streams-components/streams-http/pom.xml
@@ -68,7 +68,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
- <version>4.3.5</version>
+ <version>${httpcomponents.client.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf43cff4/streams-monitoring/pom.xml
----------------------------------------------------------------------
diff --git a/streams-monitoring/pom.xml b/streams-monitoring/pom.xml
index 51b5f16..028b173 100644
--- a/streams-monitoring/pom.xml
+++ b/streams-monitoring/pom.xml
@@ -63,6 +63,7 @@
</dependency>
</dependencies>
+
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
@@ -76,5 +77,33 @@
<directory>src/test/resources</directory>
</testResource>
</testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.pojo.json</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf43cff4/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 6e07619..cbae7ee 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -42,6 +42,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
private static MBeanServer server;
private long DEFAULT_WAIT_TIME = 30000;
+ private long waitTime;
private ObjectMapper objectMapper;
private Map<String, Object> streamConfig;
private String broadcastURI = null;
@@ -54,6 +55,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
server = ManagementFactory.getPlatformMBeanServer();
setBroadcastURI();
+ setWaitTime();
messagePersister = new BroadcastMessagePersister(broadcastURI);
initializeObjectMapper();
@@ -109,7 +111,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
}
messagePersister.persistMessages(messages);
- Thread.sleep(DEFAULT_WAIT_TIME);
+ Thread.sleep(waitTime);
} catch (InterruptedException e) {
LOGGER.error("Interrupted!: {}", e);
} catch (Exception e) {
@@ -130,6 +132,30 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
}
}
+ /**
+ * Go through streams config and set the thread's wait time (if present)
+ */
+ private void setWaitTime() {
+ try {
+ if (streamConfig != null &&
+ streamConfig.containsKey("monitoring_broadcast_interval_ms") &&
+ streamConfig.get("monitoring_broadcast_interval_ms") != null &&
+ streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long ||
+ streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer) {
+ waitTime = Long.parseLong(streamConfig.get("monitoring_broadcast_interval_ms").toString());
+ } else {
+ waitTime = DEFAULT_WAIT_TIME;
+ }
+
+ //Shutdown
+ if(waitTime == -1) {
+ this.keepRunning = false;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while trying to set default broadcast thread wait time: {}", e);
+ }
+ }
+
public void shutdown() {
this.keepRunning = false;
LOGGER.debug("Shutting down BroadcastMonitor Thread");
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf43cff4/streams-pojo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo/pom.xml b/streams-pojo/pom.xml
index 0b42b3f..57b82fa 100644
--- a/streams-pojo/pom.xml
+++ b/streams-pojo/pom.xml
@@ -189,11 +189,6 @@
<sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/verbs/share.json</sourcePath>
<sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/verbs/tag.json</sourcePath>
<sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/verbs/update.json</sourcePath>
- <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json</sourcePath>
- <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json</sourcePath>
- <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json</sourcePath>
- <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json</sourcePath>
- <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
<targetPackage>org.apache.streams.pojo.json</targetPackage>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf43cff4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 47cb08f..856b815 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -49,6 +49,7 @@ public class LocalStreamBuilder implements StreamBuilder {
public static final String TIMEOUT_KEY = "TIMEOUT";
public static final String BROADCAST_KEY = "broadcastURI";
+ public static final String BROADCAST_INTERVAL_KEY = "monitoring_broadcast_interval_ms";
private Map<String, StreamComponent> providers;
private Map<String, StreamComponent> components;