You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:24:56 UTC
[15/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
index e4d883d..35dbcd5 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
@@ -15,73 +15,78 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.jackson;
+import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
+
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
import org.slf4j.Logger;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
public class ThroughputQueueDeserializer extends JsonDeserializer<ThroughputQueueBroadcast> {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThroughputQueueDeserializer.class);
- public ThroughputQueueDeserializer() {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThroughputQueueDeserializer.class);
- }
+ public ThroughputQueueDeserializer() {
- @Override
- public ThroughputQueueBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
- try {
- MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ }
- ThroughputQueueBroadcast throughputQueueBroadcast = new ThroughputQueueBroadcast();
- JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+ @Override
+ public ThroughputQueueBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+ try {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
- ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
- MBeanInfo info = server.getMBeanInfo(name);
- throughputQueueBroadcast.setName(name.toString());
+ ThroughputQueueBroadcast throughputQueueBroadcast = new ThroughputQueueBroadcast();
+ JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
- for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
- try {
- switch(attribute.getName()) {
- case "CurrentSize":
- throughputQueueBroadcast.setCurrentSize((long) server.getAttribute(name, attribute.getName()));
- break;
- case "AvgWait":
- throughputQueueBroadcast.setAvgWait((double) server.getAttribute(name, attribute.getName()));
- break;
- case "MaxWait":
- throughputQueueBroadcast.setMaxWait((long) server.getAttribute(name, attribute.getName()));
- break;
- case "Removed":
- throughputQueueBroadcast.setRemoved((long) server.getAttribute(name, attribute.getName()));
- break;
- case "Added":
- throughputQueueBroadcast.setAdded((long) server.getAttribute(name, attribute.getName()));
- break;
- case "Throughput":
- throughputQueueBroadcast.setThroughput((double) server.getAttribute(name, attribute.getName()));
- break;
- }
- } catch (Exception e) {
- LOGGER.error("Exception while trying to deserialize ThroughputQueueBroadcast object: {}", e);
- }
- }
+ ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
+ MBeanInfo info = server.getMBeanInfo(name);
+ throughputQueueBroadcast.setName(name.toString());
- return throughputQueueBroadcast;
- } catch (Exception e) {
- return null;
+ for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
+ try {
+ switch (attribute.getName()) {
+ case "CurrentSize":
+ throughputQueueBroadcast.setCurrentSize((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "AvgWait":
+ throughputQueueBroadcast.setAvgWait((double) server.getAttribute(name, attribute.getName()));
+ break;
+ case "MaxWait":
+ throughputQueueBroadcast.setMaxWait((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "Removed":
+ throughputQueueBroadcast.setRemoved((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "Added":
+ throughputQueueBroadcast.setAdded((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "Throughput":
+ throughputQueueBroadcast.setThroughput((double) server.getAttribute(name, attribute.getName()));
+ break;
+ default:
+ break;
+ }
+ } catch (Exception ex) {
+ LOGGER.error("Exception while trying to deserialize ThroughputQueueBroadcast object: {}", ex);
}
+ }
+
+ return throughputQueueBroadcast;
+ } catch (Exception ex) {
+ return null;
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java
index 28c7fa7..667e9f6 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java
@@ -15,20 +15,21 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.monitoring.persist;
import java.util.List;
/**
- * Interface to define how we persist messages (JMX/monitoring related)
+ * Interface to define how we persist messages (JMX/monitoring related).
*/
public interface MessagePersister {
- /**
- * Given a list of messages, persist them out through whatever appropriate
- * broadcast mechanism (HTTP request, SLF4J log, etc.)
- * @param messages
- * @return statusCode represents whether or not the persist was successful
- */
- int persistMessages(List<String> messages);
+ /**
+ * Given a list of messages, persist them out through whatever appropriate
+ * broadcast mechanism (HTTP request, SLF4J log, etc.).
+ * @param messages List of String messages
+ * @return statusCode represents whether or not the persist was successful
+ */
+ int persistMessages(List<String> messages);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java
index bf0591f..1466f31 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java
@@ -15,8 +15,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.monitoring.persist.impl;
+import org.apache.streams.monitoring.persist.MessagePersister;
+
import com.google.common.collect.Lists;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
@@ -25,70 +28,70 @@ import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
-import org.apache.streams.monitoring.persist.MessagePersister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class BroadcastMessagePersister implements MessagePersister {
- private final static Logger LOGGER = LoggerFactory.getLogger(BroadcastMessagePersister.class);
- private String broadcastURI;
- public BroadcastMessagePersister(String broadcastURI) {
- this.broadcastURI = broadcastURI;
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(BroadcastMessagePersister.class);
+ private String broadcastUri;
- @Override
- /**
- * Given a list of messages as Strings, broadcast them to the broadcastURI
- * (if one is defined)
- * @param messages
- * @return int status code from POST response
- */
- public int persistMessages(List<String> messages) {
- int responseCode = -1;
+ public BroadcastMessagePersister(String broadcastUri) {
+ this.broadcastUri = broadcastUri;
+ }
- if(broadcastURI != null) {
- try {
- HttpClient client = HttpClients.createDefault();
- HttpPost post = new HttpPost(broadcastURI);
+ @Override
+ /**
+ * Given a list of messages as Strings, broadcast them to the broadcastUri
+ * (if one is defined)
+ * @param messages
+ * @return int status code from POST response
+ */
+ public int persistMessages(List<String> messages) {
+ int responseCode = -1;
- post.setHeader("User-Agent", "Streams");
+ if (broadcastUri != null) {
+ try {
+ HttpClient client = HttpClients.createDefault();
+ HttpPost post = new HttpPost(broadcastUri);
- List<NameValuePair> urlParameters = Lists.newArrayList();
- urlParameters.add(new BasicNameValuePair("messages", serializeMessages(messages)));
+ post.setHeader("User-Agent", "Streams");
- post.setEntity(new UrlEncodedFormEntity(urlParameters, "UTF-8"));
+ List<NameValuePair> urlParameters = Lists.newArrayList();
+ urlParameters.add(new BasicNameValuePair("messages", serializeMessages(messages)));
- HttpResponse response = client.execute(post);
- responseCode = response.getStatusLine().getStatusCode();
+ post.setEntity(new UrlEncodedFormEntity(urlParameters, "UTF-8"));
- LOGGER.debug("Broadcast {} messages to URI: {}", messages.size(), broadcastURI);
- } catch (Exception e) {
- LOGGER.error("Failed to broadcast message to URI: {}, exception: {}", broadcastURI, e);
- }
- }
+ HttpResponse response = client.execute(post);
+ responseCode = response.getStatusLine().getStatusCode();
- return responseCode;
+ LOGGER.debug("Broadcast {} messages to URI: {}", messages.size(), broadcastUri);
+ } catch (Exception ex) {
+ LOGGER.error("Failed to broadcast message to URI: {}, exception: {}", broadcastUri, ex);
+ }
}
- /**
- * Given a List of String messages, convert them to a JSON array
- * @param messages
- * @return Serialized version of this JSON array
- */
- private String serializeMessages(List<String> messages) {
- String ser = "{\"messages\":[";
-
- for(String message : messages) {
- if(messages.get(messages.size()-1).equals(message)) {
- ser += message + "]}";
- } else {
- ser += message + ",";
- }
- }
-
- return ser;
+ return responseCode;
+ }
+
+ /**
+ * Given a List of String messages, convert them to a JSON array.
+ * @param messages List of String messages
+ * @return Serialized version of this JSON array
+ */
+ private String serializeMessages(List<String> messages) {
+ String ser = "{\"messages\":[";
+
+ for (String message : messages) {
+ if (messages.get(messages.size() - 1).equals(message)) {
+ ser += message + "]}";
+ } else {
+ ser += message + ",";
+ }
}
+
+ return ser;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
index 312502c..c697661 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
@@ -15,90 +15,93 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.monitoring.persist.impl;
import org.apache.streams.monitoring.persist.MessagePersister;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.concurrent.Executors;
public class LogstashUdpMessagePersister implements MessagePersister {
- private final static Logger LOGGER = LoggerFactory.getLogger(LogstashUdpMessagePersister.class);
- private String broadcastURI;
- URI uri;
+ private static final Logger LOGGER = LoggerFactory.getLogger(LogstashUdpMessagePersister.class);
+ private String broadcastUri;
+ URI uri;
+
+ public LogstashUdpMessagePersister(String broadcastUri) {
+ this.broadcastUri = broadcastUri;
+ setup();
+ }
- public LogstashUdpMessagePersister(String broadcastURI) {
- this.broadcastURI = broadcastURI;
- setup();
+ /**
+ * setup.
+ */
+ public void setup() {
+
+ try {
+ uri = new URI(broadcastUri);
+ } catch (URISyntaxException ex) {
+ LOGGER.error(ex.getMessage());
}
- public void setup() {
+ }
- try {
- uri = new URI(broadcastURI);
- } catch (URISyntaxException e) {
- LOGGER.error(e.getMessage());
- }
+ @Override
+ /**
+ * Given a list of messages as Strings, broadcast them to the broadcastUri
+ * (if one is defined)
+ * @param messages
+ * @return int status code from POST response
+ */
+ public int persistMessages(List<String> messages) {
+ int responseCode = -1;
- }
- @Override
- /**
- * Given a list of messages as Strings, broadcast them to the broadcastURI
- * (if one is defined)
- * @param messages
- * @return int status code from POST response
- */
- public int persistMessages(List<String> messages) {
- int responseCode = -1;
-
- if(broadcastURI != null) {
- DatagramSocket socket = null;
- try {
- socket = new DatagramSocket();
- } catch (SocketException e) {
- LOGGER.error("Metrics Broadcast Setup Failed: " + e.getMessage());
- }
- try {
- ByteBuffer toWrite = ByteBuffer.wrap(serializeMessages(messages).getBytes());
- byte[] byteArray = toWrite.array();
- DatagramPacket packet = new DatagramPacket(byteArray, byteArray.length);
- socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort()));
- socket.send(packet);
- } catch( Exception e ) {
- LOGGER.error("Metrics Broadcast Failed: " + e.getMessage());
- } finally {
- socket.close();
- }
- }
-
- return responseCode;
+ if (broadcastUri != null) {
+ DatagramSocket socket = null;
+ try {
+ socket = new DatagramSocket();
+ } catch (SocketException ex) {
+ LOGGER.error("Metrics Broadcast Setup Failed: " + ex.getMessage());
+ }
+ try {
+ ByteBuffer toWrite = ByteBuffer.wrap(serializeMessages(messages).getBytes());
+ byte[] byteArray = toWrite.array();
+ DatagramPacket packet = new DatagramPacket(byteArray, byteArray.length);
+ socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort()));
+ socket.send(packet);
+ } catch ( Exception ex ) {
+ LOGGER.error("Metrics Broadcast Failed: " + ex.getMessage());
+ } finally {
+ socket.close();
+ }
}
- /**
- * Given a List of String messages, convert them to a JSON array
- * @param messages
- * @return Serialized version of this JSON array
- */
- private String serializeMessages(List<String> messages) {
+ return responseCode;
+ }
- StringBuilder json_lines = new StringBuilder();
- for(String message : messages) {
- json_lines.append(message).append('\n');
- }
+ /**
+ * Given a List of String messages, convert them to a JSON array.
+ * @param messages List of String messages
+ * @return Serialized version of this JSON array
+ */
+ private String serializeMessages(List<String> messages) {
- return json_lines.toString();
+ StringBuilder jsonLines = new StringBuilder();
+ for (String message : messages) {
+ jsonLines.append(message).append('\n');
}
+ return jsonLines.toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java
deleted file mode 100644
index 19c36f2..0000000
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.monitoring.persist.impl;
-
-import org.apache.streams.monitoring.persist.MessagePersister;
-import org.slf4j.Logger;
-
-import java.util.List;
-
-public class SLF4JMessagePersister implements MessagePersister {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(SLF4JMessagePersister.class);
- private static final int SUCCESS_STATUS = 0;
- private static final int FAILURE_STATUS = -1;
-
- public SLF4JMessagePersister() {
-
- }
-
- @Override
- public int persistMessages(List<String> messages) {
- for(String message : messages) {
- LOGGER.info(message);
- }
-
- return SUCCESS_STATUS;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java
new file mode 100644
index 0000000..b237871
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.monitoring.persist.impl;
+
+import org.apache.streams.monitoring.persist.MessagePersister;
+
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Persist montoring messages to SLF4J.
+ */
+public class Slf4jMessagePersister implements MessagePersister {
+
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(Slf4jMessagePersister.class);
+ private static final int SUCCESS_STATUS = 0;
+ private static final int FAILURE_STATUS = -1;
+
+ public Slf4jMessagePersister() {
+
+ }
+
+ @Override
+ public int persistMessages(List<String> messages) {
+
+ for (String message : messages) {
+ LOGGER.info(message);
+ }
+
+ return SUCCESS_STATUS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index f21a212..a797ce5 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -15,189 +15,204 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.monitoring.tasks;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.Lists;
import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.jackson.*;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.jackson.DatumStatusCounterDeserializer;
+import org.apache.streams.jackson.MemoryUsageDeserializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.jackson.StreamsTaskCounterDeserializer;
+import org.apache.streams.jackson.ThroughputQueueDeserializer;
import org.apache.streams.local.monitoring.MonitoringConfiguration;
import org.apache.streams.monitoring.persist.MessagePersister;
import org.apache.streams.monitoring.persist.impl.BroadcastMessagePersister;
import org.apache.streams.monitoring.persist.impl.LogstashUdpMessagePersister;
-import org.apache.streams.monitoring.persist.impl.SLF4JMessagePersister;
+import org.apache.streams.monitoring.persist.impl.Slf4jMessagePersister;
import org.apache.streams.pojo.json.Broadcast;
import org.apache.streams.pojo.json.DatumStatusCounterBroadcast;
import org.apache.streams.pojo.json.MemoryUsageBroadcast;
import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast;
import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
-import javax.management.*;
import java.lang.management.ManagementFactory;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.ObjectName;
/**
* This thread runs inside of a Streams runtime and periodically persists information
- * from relevant JMX beans
+ * from relevant JMX beans.
*/
public class BroadcastMonitorThread extends NotificationBroadcasterSupport implements Runnable {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(BroadcastMonitorThread.class);
- private static MBeanServer server;
-
- private MonitoringConfiguration configuration;
- private URI broadcastURI = null;
- private MessagePersister messagePersister;
- private volatile boolean keepRunning;
-
- private static ObjectMapper objectMapper = StreamsJacksonMapper.getInstance();
-
- /**
- * DEPRECATED
- * Please initialize logging with monitoring object via typesafe
- * @param streamConfig
- */
- @Deprecated
- public BroadcastMonitorThread(Map<String, Object> streamConfig) {
- this(objectMapper.convertValue(streamConfig, MonitoringConfiguration.class));
- }
-
- public BroadcastMonitorThread(StreamsConfiguration streamConfig) {
- this(objectMapper.convertValue(streamConfig.getAdditionalProperties().get("monitoring"), MonitoringConfiguration.class));
- }
-
- public BroadcastMonitorThread(MonitoringConfiguration configuration) {
-
- this.configuration = configuration;
- if( this.configuration == null )
- this.configuration = new ComponentConfigurator<>(MonitoringConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().atPath("monitoring"));
-
- LOGGER.info("BroadcastMonitorThread created");
-
- initializeObjectMapper();
-
- prepare();
-
- LOGGER.info("BroadcastMonitorThread initialized");
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(BroadcastMonitorThread.class);
+ private static MBeanServer server;
+
+ private MonitoringConfiguration configuration;
+ private URI broadcastUri = null;
+ private MessagePersister messagePersister;
+ private volatile boolean keepRunning;
+
+ private static ObjectMapper objectMapper = StreamsJacksonMapper.getInstance();
+
+ /**
+ * DEPRECATED
+ * Please initialize logging with monitoring object via typesafe.
+ * @param streamConfig streamConfig map.
+ */
+ @Deprecated
+ public BroadcastMonitorThread(Map<String, Object> streamConfig) {
+ this(objectMapper.convertValue(streamConfig, MonitoringConfiguration.class));
+ }
+
+ public BroadcastMonitorThread(StreamsConfiguration streamConfig) {
+ this(objectMapper.convertValue(streamConfig.getAdditionalProperties().get("monitoring"), MonitoringConfiguration.class));
+ }
+
+ /**
+ * BroadcastMonitorThread constructor - uses supplied MonitoringConfiguration.
+ * @param configuration MonitoringConfiguration
+ */
+ public BroadcastMonitorThread(MonitoringConfiguration configuration) {
+
+ this.configuration = configuration;
+ if ( this.configuration == null ) {
+ this.configuration = new ComponentConfigurator<>(MonitoringConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().atPath("monitoring"));
}
- /**
- * Initialize our object mapper with all of our bean's custom deserializers
- * This way we can convert them to and from Strings dictated by our
- * POJOs which are generated from JSON schemas
- */
- private void initializeObjectMapper() {
- SimpleModule simpleModule = new SimpleModule();
-
- simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
- simpleModule.addDeserializer(ThroughputQueueBroadcast.class, new ThroughputQueueDeserializer());
- simpleModule.addDeserializer(StreamsTaskCounterBroadcast.class, new StreamsTaskCounterDeserializer());
- simpleModule.addDeserializer(DatumStatusCounterBroadcast.class, new DatumStatusCounterDeserializer());
-
- objectMapper.registerModule(simpleModule);
- objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
+ LOGGER.info("BroadcastMonitorThread created");
+
+ initializeObjectMapper();
+
+ prepare();
+
+ LOGGER.info("BroadcastMonitorThread initialized");
+
+ }
+
+ /**
+ * Initialize our object mapper with all of our bean's custom deserializers.
+ * This way we can convert them to and from Strings dictated by our
+ * POJOs which are generated from JSON schemas.
+ */
+ private void initializeObjectMapper() {
+ SimpleModule simpleModule = new SimpleModule();
+
+ simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
+ simpleModule.addDeserializer(ThroughputQueueBroadcast.class, new ThroughputQueueDeserializer());
+ simpleModule.addDeserializer(StreamsTaskCounterBroadcast.class, new StreamsTaskCounterDeserializer());
+ simpleModule.addDeserializer(DatumStatusCounterBroadcast.class, new DatumStatusCounterDeserializer());
+
+ objectMapper.registerModule(simpleModule);
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ /**
+ * Get all relevant JMX beans, convert their values to strings, and then persist them.
+ */
+ @Override
+ public void run() {
+ LOGGER.info("BroadcastMonitorThread running");
+ while (keepRunning) {
+ try {
+ List<String> messages = Lists.newArrayList();
+ Set<ObjectName> beans = server.queryNames(null, null);
+
+ for (ObjectName name : beans) {
+ String item = objectMapper.writeValueAsString(name);
+ Broadcast broadcast = null;
+
+ if (name.getKeyPropertyList().get("type") != null) {
+ if (name.getKeyPropertyList().get("type").equals("ThroughputQueue")) {
+ broadcast = objectMapper.readValue(item, ThroughputQueueBroadcast.class);
+ } else if (name.getKeyPropertyList().get("type").equals("StreamsTaskCounter")) {
+ broadcast = objectMapper.readValue(item, StreamsTaskCounterBroadcast.class);
+ } else if (name.getKeyPropertyList().get("type").equals("DatumStatusCounter")) {
+ broadcast = objectMapper.readValue(item, DatumStatusCounterBroadcast.class);
+ } else if (name.getKeyPropertyList().get("type").equals("Memory")) {
+ broadcast = objectMapper.readValue(item, MemoryUsageBroadcast.class);
+ }
- /**
- * Get all relevant JMX beans, convert their values to strings, and then persist them
- */
- @Override
- public void run() {
- LOGGER.info("BroadcastMonitorThread running");
- while(keepRunning) {
- try {
- List<String> messages = Lists.newArrayList();
- Set<ObjectName> beans = server.queryNames(null, null);
-
- for(ObjectName name : beans) {
- String item = objectMapper.writeValueAsString(name);
- Broadcast broadcast = null;
-
- if(name.getKeyPropertyList().get("type") != null) {
- if (name.getKeyPropertyList().get("type").equals("ThroughputQueue")) {
- broadcast = objectMapper.readValue(item, ThroughputQueueBroadcast.class);
- } else if (name.getKeyPropertyList().get("type").equals("StreamsTaskCounter")) {
- broadcast = objectMapper.readValue(item, StreamsTaskCounterBroadcast.class);
- } else if (name.getKeyPropertyList().get("type").equals("DatumStatusCounter")) {
- broadcast = objectMapper.readValue(item, DatumStatusCounterBroadcast.class);
- } else if (name.getKeyPropertyList().get("type").equals("Memory")) {
- broadcast = objectMapper.readValue(item, MemoryUsageBroadcast.class);
- }
-
- if(broadcast != null) {
- messages.add(objectMapper.writeValueAsString(broadcast));
- }
- }
- }
-
- messagePersister.persistMessages(messages);
- Thread.sleep(configuration.getMonitoringBroadcastIntervalMs());
- } catch (InterruptedException e) {
- LOGGER.debug("Broadcast Monitor Interrupted!");
- Thread.currentThread().interrupt();
- this.keepRunning = false;
- } catch (Exception e) {
- LOGGER.error("Exception: {}", e);
- this.keepRunning = false;
+ if (broadcast != null) {
+ messages.add(objectMapper.writeValueAsString(broadcast));
}
+ }
}
+
+ messagePersister.persistMessages(messages);
+ Thread.sleep(configuration.getMonitoringBroadcastIntervalMs());
+ } catch (InterruptedException ex) {
+ LOGGER.debug("Broadcast Monitor Interrupted!");
+ Thread.currentThread().interrupt();
+ this.keepRunning = false;
+ } catch (Exception ex) {
+ LOGGER.error("Exception: {}", ex);
+ this.keepRunning = false;
+ }
}
+ }
- public void prepare() {
+ /**
+ * prepare for execution.
+ */
+ public void prepare() {
- keepRunning = true;
+ keepRunning = true;
- LOGGER.info("BroadcastMonitorThread setup " + this.configuration);
+ LOGGER.info("BroadcastMonitorThread setup " + this.configuration);
- server = ManagementFactory.getPlatformMBeanServer();
+ server = ManagementFactory.getPlatformMBeanServer();
- if (this.configuration != null &&
- this.configuration.getBroadcastURI() != null) {
+ if (this.configuration != null && this.configuration.getBroadcastURI() != null) {
- try {
- broadcastURI = new URI(configuration.getBroadcastURI());
- } catch (Exception e) {
- LOGGER.error("invalid URI: ", e);
- }
+ try {
+ broadcastUri = new URI(configuration.getBroadcastURI());
+ } catch (Exception ex) {
+ LOGGER.error("invalid URI: ", ex);
+ }
- if (broadcastURI != null) {
- if (broadcastURI.getScheme().equals("http")) {
- messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
- } else if (broadcastURI.getScheme().equals("udp")) {
- messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
- } else {
- LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined.");
- throw new RuntimeException();
- }
- } else {
- messagePersister = new SLF4JMessagePersister();
- }
+ if (broadcastUri != null) {
+ if (broadcastUri.getScheme().equals("http")) {
+ messagePersister = new BroadcastMessagePersister(broadcastUri.toString());
+ } else if (broadcastUri.getScheme().equals("udp")) {
+ messagePersister = new LogstashUdpMessagePersister(broadcastUri.toString());
} else {
- messagePersister = new SLF4JMessagePersister();
+ LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined.");
+ throw new RuntimeException();
}
-
+ } else {
+ messagePersister = new Slf4jMessagePersister();
+ }
+ } else {
+ messagePersister = new Slf4jMessagePersister();
}
- public void shutdown() {
- this.keepRunning = false;
- LOGGER.debug("Shutting down BroadcastMonitor Thread");
- }
+ }
- public String getBroadcastURI() {
- return configuration.getBroadcastURI();
- }
+ public void shutdown() {
+ this.keepRunning = false;
+ LOGGER.debug("Shutting down BroadcastMonitor Thread");
+ }
- public long getWaitTime() {
- return configuration.getMonitoringBroadcastIntervalMs();
- }
+ public String getBroadcastUri() {
+ return configuration.getBroadcastURI();
+ }
+
+ public long getWaitTime() {
+ return configuration.getMonitoringBroadcastIntervalMs();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java b/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
index 1c68239..8bf3219 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
@@ -15,13 +15,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.jackson;
+import org.apache.streams.pojo.json.MemoryUsageBroadcast;
+
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.commons.lang3.StringUtils;
-import org.apache.streams.pojo.json.MemoryUsageBroadcast;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -35,43 +37,46 @@ import static org.junit.Assert.assertNotNull;
public class MemoryUsageDeserializerTest {
- private final static Logger LOGGER = LoggerFactory.getLogger(MemoryUsageDeserializerTest.class);
- private ObjectMapper objectMapper;
+ private static final Logger LOGGER = LoggerFactory.getLogger(MemoryUsageDeserializerTest.class);
+ private ObjectMapper objectMapper;
- @Before
- public void setup() {
- objectMapper = StreamsJacksonMapper.getInstance();
- SimpleModule simpleModule = new SimpleModule();
- simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
- objectMapper.registerModule(simpleModule);
- objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
+ /**
+ * setup.
+ */
+ @Before
+ public void setup() {
+ objectMapper = StreamsJacksonMapper.getInstance();
+ SimpleModule simpleModule = new SimpleModule();
+ simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
+ objectMapper.registerModule(simpleModule);
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
- @Test
- public void serDeTest() {
- InputStream is = MemoryUsageDeserializerTest.class.getResourceAsStream("/MemoryUsageObjects.json");
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
+ @Test
+ public void serDeTest() {
+ InputStream is = MemoryUsageDeserializerTest.class.getResourceAsStream("/MemoryUsageObjects.json");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
- try {
- while (br.ready()) {
- String line = br.readLine();
- if (!StringUtils.isEmpty(line)) {
- LOGGER.info("raw: {}", line);
- MemoryUsageBroadcast broadcast = objectMapper.readValue(line, MemoryUsageBroadcast.class);
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ if (!StringUtils.isEmpty(line)) {
+ LOGGER.info("raw: {}", line);
+ MemoryUsageBroadcast broadcast = objectMapper.readValue(line, MemoryUsageBroadcast.class);
- LOGGER.info("activity: {}", broadcast);
+ LOGGER.info("activity: {}", broadcast);
- assertNotNull(broadcast);
- assertNotNull(broadcast.getVerbose());
- assertNotNull(broadcast.getObjectPendingFinalizationCount());
- assertNotNull(broadcast.getHeapMemoryUsage());
- assertNotNull(broadcast.getNonHeapMemoryUsage());
- assertNotNull(broadcast.getName());
- }
- }
- } catch (Exception e) {
- LOGGER.error("Exception while testing serializability: {}", e);
+ assertNotNull(broadcast);
+ assertNotNull(broadcast.getVerbose());
+ assertNotNull(broadcast.getObjectPendingFinalizationCount());
+ assertNotNull(broadcast.getHeapMemoryUsage());
+ assertNotNull(broadcast.getNonHeapMemoryUsage());
+ assertNotNull(broadcast.getName());
}
+ }
+ } catch (Exception ex) {
+ LOGGER.error("Exception while testing serializability: {}", ex);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java
index 6e7ff6d..fc2ff71 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java
@@ -15,6 +15,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.monitoring.persist.impl;
import com.google.common.collect.Lists;
@@ -28,33 +29,33 @@ import static org.junit.Assert.assertNotNull;
public class BroadcastMessagePersisterTest {
- @Test
- public void testFailedPersist() {
- BroadcastMessagePersister persister = new BroadcastMessagePersister("http://fake.url.com/fake_endpointasdfasdfas");
-
- List<String> messages = Lists.newArrayList();
- for(int x = 0; x < 10; x ++) {
- messages.add("Fake_message #" + x);
- }
-
- int statusCode = persister.persistMessages(messages);
+ @Test
+ public void testFailedPersist() {
+ BroadcastMessagePersister persister = new BroadcastMessagePersister("http://fake.url.com/fake_endpointasdfasdfas");
- assertNotNull(statusCode);
- assertNotEquals(statusCode, 200);
+ List<String> messages = Lists.newArrayList();
+ for (int x = 0; x < 10; x++) {
+ messages.add("Fake_message #" + x);
}
- @Test
- public void testInvalidURL() {
- BroadcastMessagePersister persister = new BroadcastMessagePersister("h");
+ int statusCode = persister.persistMessages(messages);
- List<String> messages = Lists.newArrayList();
- for(int x = 0; x < 10; x ++) {
- messages.add("Fake_message #" + x);
- }
+ assertNotNull(statusCode);
+ assertNotEquals(statusCode, 200);
+ }
- int statusCode = persister.persistMessages(messages);
+ @Test
+ public void testInvalidUrl() {
+ BroadcastMessagePersister persister = new BroadcastMessagePersister("h");
- assertNotNull(statusCode);
- assertEquals(statusCode, -1);
+ List<String> messages = Lists.newArrayList();
+ for (int x = 0; x < 10; x++) {
+ messages.add("Fake_message #" + x);
}
+
+ int statusCode = persister.persistMessages(messages);
+
+ assertNotNull(statusCode);
+ assertEquals(statusCode, -1);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
index faa99a2..3f9a4c1 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
@@ -15,6 +15,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.monitoring.persist.impl;
import com.google.common.base.Splitter;
@@ -29,47 +30,51 @@ import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.List;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
public class LogstashUdpMessagePersisterTest {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LogstashUdpMessagePersisterTest.class);
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LogstashUdpMessagePersisterTest.class);
- DatagramSocket socket = null;
+ DatagramSocket socket = null;
- @Before
- public void setup() {
- try {
- socket = new DatagramSocket(56789);
- } catch (SocketException e) {
- LOGGER.error("Metrics Broadcast Test Setup Failed: " + e.getMessage());
- }
+ /**
+ * setup.
+ */
+ @Before
+ public void setup() {
+ try {
+ socket = new DatagramSocket(56789);
+ } catch (SocketException ex) {
+ LOGGER.error("Metrics Broadcast Test Setup Failed: " + ex.getMessage());
}
+ }
- @Test
- public void testFailedPersist() {
- LogstashUdpMessagePersister persister = new LogstashUdpMessagePersister("udp://127.0.0.1:56789");
-
- List<String> messageArray = Lists.newArrayList();
- for(int x = 0; x < 10; x ++) {
- messageArray.add("Fake_message #" + x);
- }
+ @Test
+ public void testFailedPersist() {
+ LogstashUdpMessagePersister persister = new LogstashUdpMessagePersister("udp://127.0.0.1:56789");
- persister.persistMessages(messageArray);
- byte[] receiveData = new byte[1024];
+ List<String> messageArray = Lists.newArrayList();
+ for (int x = 0; x < 10; x ++) {
+ messageArray.add("Fake_message #" + x);
+ }
- DatagramPacket messageDatagram = new DatagramPacket(receiveData, receiveData.length);
+ persister.persistMessages(messageArray);
+ byte[] receiveData = new byte[1024];
- try {
- socket.receive(messageDatagram);
- assertNotNull(messageDatagram);
- List<String> messages = Lists.newArrayList(Splitter.on('\n').split(new String(messageDatagram.getData())));
- assertEquals(messageArray, messages.subList(0,10));
- } catch (IOException e) {
- LOGGER.error("Metrics Broadcast Test Failed: " + e.getMessage());
- }
+ DatagramPacket messageDatagram = new DatagramPacket(receiveData, receiveData.length);
+ try {
+ socket.receive(messageDatagram);
+ assertNotNull(messageDatagram);
+ List<String> messages = Lists.newArrayList(Splitter.on('\n').split(new String(messageDatagram.getData())));
+ assertEquals(messageArray, messages.subList(0,10));
+ } catch (IOException ex) {
+ LOGGER.error("Metrics Broadcast Test Failed: " + ex.getMessage());
}
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
index a959bd2..ad1bf05 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
@@ -15,67 +15,59 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.monitoring.tasks;
-import com.google.common.collect.Maps;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.local.monitoring.MonitoringConfiguration;
-import org.junit.Ignore;
+
import org.junit.Test;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BroadcastMonitorThreadTest {
- private ExecutorService executor;
- @Test
- public void testThreadEmptyBeanConfig() {
- StreamsConfiguration streamsConfiguration = new StreamsConfiguration();
- BroadcastMonitorThread thread = new BroadcastMonitorThread(streamsConfiguration);
- testThread(thread);
- }
+ private ExecutorService executor;
- @Test
- public void testThreadEmptyMapConfig() {
- Map<String, Object> map = Maps.newHashMap();
- BroadcastMonitorThread thread = new BroadcastMonitorThread(map);
- testThread(thread);
- }
+ @Test
+ public void testThreadEmptyBeanConfig() {
+ StreamsConfiguration streamsConfiguration = new StreamsConfiguration();
+ BroadcastMonitorThread thread = new BroadcastMonitorThread(streamsConfiguration);
+ testThread(thread);
+ }
- @Test
- public void testThreadFakeMapConfig() {
- Map<String, Object> config = Maps.newHashMap();
- config.put("broadcastURI", "http://fakeurl.com/fake");
- BroadcastMonitorThread thread = new BroadcastMonitorThread(config);
- testThread(thread);
- }
- @Test
- public void testThreadStreamsConfig() {
- StreamsConfiguration streams = new StreamsConfiguration();
- MonitoringConfiguration monitoring = new MonitoringConfiguration();
- monitoring.setBroadcastURI("http://fakeurl.com/fake");
- monitoring.setMonitoringBroadcastIntervalMs(30000L);
- streams.setAdditionalProperty("monitoring", monitoring);
- BroadcastMonitorThread thread = new BroadcastMonitorThread(streams);
- testThread(thread);
- }
- public void testThread(BroadcastMonitorThread thread) {
- long testRunLength = thread.getWaitTime() * 1;
- executor = Executors.newFixedThreadPool(1);
- executor.submit(thread);
+ @Test
+ public void testThreadStreamsConfig() {
- try {
- Thread.sleep(testRunLength);
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ StreamsConfiguration streams = new StreamsConfiguration();
+ MonitoringConfiguration monitoring = new MonitoringConfiguration();
+ monitoring.setBroadcastURI("http://fakeurl.com/fake");
+ monitoring.setMonitoringBroadcastIntervalMs(30000L);
+ streams.setAdditionalProperty("monitoring", monitoring);
+ BroadcastMonitorThread thread = new BroadcastMonitorThread(streams);
+ testThread(thread);
+ }
- executor.shutdown();
+ /**
+ * Base Test.
+ * @param thread BroadcastMonitorThread
+ */
+ public void testThread(BroadcastMonitorThread thread) {
+ long testRunLength = thread.getWaitTime() * 1;
+ executor = Executors.newFixedThreadPool(1);
+ executor.submit(thread);
+
+ try {
+ Thread.sleep(testRunLength);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
}
+ executor.shutdown();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java b/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java
index 964fff6..971b99f 100644
--- a/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java
+++ b/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java
@@ -16,9 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.plugins.cassandra;
import org.apache.streams.util.schema.GenerationConfig;
+
import org.jsonschema2pojo.DefaultGenerationConfig;
import org.jsonschema2pojo.util.URLUtil;
@@ -32,68 +34,71 @@ import java.util.List;
import java.util.Set;
/**
- * Configures StreamsHiveResourceGenerator
- *
- *
+ * Configures StreamsCassandraResourceGenerator.
*/
public class StreamsCassandraGenerationConfig extends DefaultGenerationConfig implements GenerationConfig {
- public String getSourceDirectory() {
- return sourceDirectory;
- }
+ public String getSourceDirectory() {
+ return sourceDirectory;
+ }
- public List<String> getSourcePaths() {
- return sourcePaths;
- }
+ public List<String> getSourcePaths() {
+ return sourcePaths;
+ }
- private String sourceDirectory;
- private List<String> sourcePaths = new ArrayList<String>();
- private String targetDirectory;
- private int maxDepth = 1;
+ private String sourceDirectory;
+ private List<String> sourcePaths = new ArrayList<String>();
+ private String targetDirectory;
+ private int maxDepth = 1;
- public Set<String> getExclusions() {
- return exclusions;
- }
+ public Set<String> getExclusions() {
+ return exclusions;
+ }
- public void setExclusions(Set<String> exclusions) {
- this.exclusions = exclusions;
- }
+ public void setExclusions(Set<String> exclusions) {
+ this.exclusions = exclusions;
+ }
- private Set<String> exclusions = new HashSet<String>();
+ private Set<String> exclusions = new HashSet<String>();
- public int getMaxDepth() {
- return maxDepth;
- }
+ public int getMaxDepth() {
+ return maxDepth;
+ }
- public void setSourceDirectory(String sourceDirectory) {
- this.sourceDirectory = sourceDirectory;
- }
+ public void setSourceDirectory(String sourceDirectory) {
+ this.sourceDirectory = sourceDirectory;
+ }
- public void setSourcePaths(List<String> sourcePaths) {
- this.sourcePaths = sourcePaths;
- }
+ public void setSourcePaths(List<String> sourcePaths) {
+ this.sourcePaths = sourcePaths;
+ }
- public void setTargetDirectory(String targetDirectory) {
- this.targetDirectory = targetDirectory;
- }
+ public void setTargetDirectory(String targetDirectory) {
+ this.targetDirectory = targetDirectory;
+ }
- public File getTargetDirectory() {
- return new File(targetDirectory);
- }
+ public File getTargetDirectory() {
+ return new File(targetDirectory);
+ }
- public Iterator<URL> getSource() {
- if (null != sourceDirectory) {
- return Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator();
- }
- List<URL> sourceURLs = new ArrayList<URL>();
- if( sourcePaths != null && sourcePaths.size() > 0)
- for (String source : sourcePaths) {
- sourceURLs.add(URLUtil.parseURL(source));
- }
- return sourceURLs.iterator();
+ /**
+ * get all sources.
+ * @return Iterator of URL
+ */
+ public Iterator<URL> getSource() {
+ if (null != sourceDirectory) {
+ return Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator();
}
-
- public void setMaxDepth(int maxDepth) {
- this.maxDepth = maxDepth;
+ List<URL> sourceUrls = new ArrayList<URL>();
+ if ( sourcePaths != null && sourcePaths.size() > 0) {
+ for (String source : sourcePaths) {
+ sourceUrls.add(URLUtil.parseURL(source));
+ }
}
+ return sourceUrls.iterator();
+ }
+
+ public void setMaxDepth(int maxDepth) {
+ this.maxDepth = maxDepth;
+ }
}