You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/13 06:24:25 UTC
[3/6] incubator-streams git commit: Merge
Merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/356f6472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/356f6472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/356f6472
Branch: refs/heads/master
Commit: 356f647200c4304e27a92b8b2051c84da47b31af
Parents: b03b1b4
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Wed Nov 12 17:38:40 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Wed Nov 12 17:38:40 2014 -0600
----------------------------------------------------------------------
.../jackson/DatumStatusCounterDeserializer.java | 76 +++++++++++++++++
.../jackson/MemoryUsageDeserializer.java | 79 ++++++++++++++++++
.../jackson/StreamsTaskCounterDeserializer.java | 88 ++++++++++++++++++++
.../jackson/ThroughputQueueDeserializer.java | 87 +++++++++++++++++++
.../org/apache/streams/pojo/json/Broadcast.json | 13 +++
.../pojo/json/DatumStatusCounterBroadcast.json | 22 +++++
.../streams/pojo/json/MemoryUsageBroadcast.json | 30 +++++++
.../pojo/json/StreamsTaskCounterBroadcast.json | 38 +++++++++
.../pojo/json/ThroughputQueueBroadcast.json | 38 +++++++++
.../jackson/MemoryUsageDeserializerTest.java | 77 +++++++++++++++++
.../src/test/resources/MemoryUsageObjects.json | 1 +
.../jackson/DatumStatusCounterDeserializer.java | 76 -----------------
.../jackson/MemoryUsageDeserializer.java | 79 ------------------
.../jackson/StreamsTaskCounterDeserializer.java | 88 --------------------
.../jackson/ThroughputQueueDeserializer.java | 87 -------------------
.../org/apache/streams/pojo/json/Broadcast.json | 13 ---
.../pojo/json/DatumStatusCounterBroadcast.json | 22 -----
.../streams/pojo/json/MemoryUsageBroadcast.json | 30 -------
.../pojo/json/StreamsTaskCounterBroadcast.json | 38 ---------
.../pojo/json/ThroughputQueueBroadcast.json | 38 ---------
.../jackson/MemoryUsageDeserializerTest.java | 77 -----------------
.../src/test/resources/MemoryUsageObjects.json | 1 -
22 files changed, 549 insertions(+), 549 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
new file mode 100644
index 0000000..8bfa28b
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.streams.pojo.json.DatumStatusCounterBroadcast;
+import org.slf4j.Logger;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+
+public class DatumStatusCounterDeserializer extends JsonDeserializer<DatumStatusCounterBroadcast> {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DatumStatusCounterDeserializer.class);
+
+ public DatumStatusCounterDeserializer() {
+
+ }
+
+ @Override
+ public DatumStatusCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+ try {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+ DatumStatusCounterBroadcast datumStatusCounterBroadcast = new DatumStatusCounterBroadcast();
+ JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+
+ ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
+ MBeanInfo info = server.getMBeanInfo(name);
+ datumStatusCounterBroadcast.setName(name.toString());
+
+ for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
+ try {
+ switch(attribute.getName()) {
+ case "Failed":
+ datumStatusCounterBroadcast.setFailed((boolean) server.getAttribute(name, attribute.getName()));
+ break;
+ case "Passed":
+ datumStatusCounterBroadcast.setPassed((boolean) server.getAttribute(name, attribute.getName()));
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", e);
+ }
+ }
+
+ return datumStatusCounterBroadcast;
+ } catch (Exception e) {
+ LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
new file mode 100644
index 0000000..43c9239
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.streams.pojo.json.MemoryUsageBroadcast;
+import org.slf4j.Logger;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeDataSupport;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+
+public class MemoryUsageDeserializer extends JsonDeserializer<MemoryUsageBroadcast> {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MemoryUsageDeserializer.class);
+
+ public MemoryUsageDeserializer() {
+
+ }
+
+ @Override
+ public MemoryUsageBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+ try {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+ MemoryUsageBroadcast memoryUsageBroadcast = new MemoryUsageBroadcast();
+ JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+
+ ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
+ MBeanInfo info = server.getMBeanInfo(name);
+ memoryUsageBroadcast.setName(name.toString());
+
+ for(MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
+ switch(attribute.getName()) {
+ case "Verbose":
+ memoryUsageBroadcast.setVerbose((boolean) server.getAttribute(name, attribute.getName()));
+ break;
+ case "ObjectPendingFinalizationCount":
+ memoryUsageBroadcast.setObjectPendingFinalizationCount(Long.parseLong(server.getAttribute(name, attribute.getName()).toString()));
+ break;
+ case "HeapMemoryUsage":
+ memoryUsageBroadcast.setHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used"));
+ break;
+ case "NonHeapMemoryUsage":
+ memoryUsageBroadcast.setNonHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used"));
+ break;
+ }
+ }
+
+ return memoryUsageBroadcast;
+ } catch (Exception e) {
+ LOGGER.error("Exception trying to deserialize MemoryUsageDeserializer object: {}", e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
new file mode 100644
index 0000000..8b65bf3
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast;
+import org.slf4j.Logger;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+
+public class StreamsTaskCounterDeserializer extends JsonDeserializer<StreamsTaskCounterBroadcast> {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsTaskCounterDeserializer.class);
+
+ public StreamsTaskCounterDeserializer() {
+
+ }
+
+ @Override
+ public StreamsTaskCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+ try {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+ StreamsTaskCounterBroadcast streamsTaskCounterBroadcast = new StreamsTaskCounterBroadcast();
+ JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+
+ ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
+ MBeanInfo info = server.getMBeanInfo(name);
+ streamsTaskCounterBroadcast.setName(name.toString());
+
+ for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
+ try {
+ switch (attribute.getName()) {
+ case "ErrorRate":
+ streamsTaskCounterBroadcast.setErrorRate((double) server.getAttribute(name, attribute.getName()));
+ break;
+ case "NumEmitted":
+ streamsTaskCounterBroadcast.setNumEmitted((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "NumReceived":
+ streamsTaskCounterBroadcast.setNumReceived((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "NumUnhandledErrors":
+ streamsTaskCounterBroadcast.setNumUnhandledErrors((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "AvgTime":
+ streamsTaskCounterBroadcast.setAvgTime((double) server.getAttribute(name, attribute.getName()));
+ break;
+ case "MaxTime":
+ streamsTaskCounterBroadcast.setMaxTime((long) server.getAttribute(name, attribute.getName()));
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", e);
+ }
+ }
+
+ return streamsTaskCounterBroadcast;
+ } catch (Exception e) {
+ LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
new file mode 100644
index 0000000..e4d883d
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
+import org.slf4j.Logger;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+
+public class ThroughputQueueDeserializer extends JsonDeserializer<ThroughputQueueBroadcast> {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThroughputQueueDeserializer.class);
+
+ public ThroughputQueueDeserializer() {
+
+ }
+
+ @Override
+ public ThroughputQueueBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+ try {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+ ThroughputQueueBroadcast throughputQueueBroadcast = new ThroughputQueueBroadcast();
+ JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+
+ ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
+ MBeanInfo info = server.getMBeanInfo(name);
+ throughputQueueBroadcast.setName(name.toString());
+
+ for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
+ try {
+ switch(attribute.getName()) {
+ case "CurrentSize":
+ throughputQueueBroadcast.setCurrentSize((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "AvgWait":
+ throughputQueueBroadcast.setAvgWait((double) server.getAttribute(name, attribute.getName()));
+ break;
+ case "MaxWait":
+ throughputQueueBroadcast.setMaxWait((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "Removed":
+ throughputQueueBroadcast.setRemoved((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "Added":
+ throughputQueueBroadcast.setAdded((long) server.getAttribute(name, attribute.getName()));
+ break;
+ case "Throughput":
+ throughputQueueBroadcast.setThroughput((double) server.getAttribute(name, attribute.getName()));
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while trying to deserialize ThroughputQueueBroadcast object: {}", e);
+ }
+ }
+
+ return throughputQueueBroadcast;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
new file mode 100644
index 0000000..4d7f87b
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
@@ -0,0 +1,13 @@
+{
+ "type" : "object",
+ "title" : "object",
+ "javaType": "org.apache.streams.pojo.json.Broadcast",
+ "javaInterfaces": ["java.io.Serializable"],
+ "description" : "Base Broadcast class",
+ "properties" : {
+ "name": {
+ "type": "string",
+ "description": "Name of the MBean"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
new file mode 100644
index 0000000..ac86d21
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
@@ -0,0 +1,22 @@
+{
+ "type" : "object",
+ "title" : "object",
+ "extends" : {"$ref": "./Broadcast.json"},
+ "javaType": "org.apache.streams.pojo.json.DatumStatusCounterBroadcast",
+ "javaInterfaces": ["java.io.Serializable"],
+ "description" : "Snapshot of the DatumStatusCounter",
+ "properties" : {
+ "passed": {
+ "type": "boolean",
+ "description": "Number of objects that have passed"
+ },
+ "failed": {
+ "type": "boolean",
+ "description": "Number of objects that have faile"
+ },
+ "name": {
+ "type": "string",
+ "description": "Name of the MBean"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
new file mode 100644
index 0000000..4a17d41
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
@@ -0,0 +1,30 @@
+{
+ "type" : "object",
+ "title" : "object",
+ "extends" : {"$ref": "./Broadcast.json"},
+ "javaType": "org.apache.streams.pojo.json.MemoryUsageBroadcast",
+ "javaInterfaces": ["java.io.Serializable"],
+ "description" : "Snapshot of current memory usage",
+ "properties" : {
+ "verbose" : {
+ "type" : "boolean",
+ "description": "Whether or not this is verbose"
+ },
+ "objectPendingFinalizationCount": {
+ "type": "integer",
+ "description": "The number of objects that are pending finalization"
+ },
+ "heapMemoryUsage": {
+ "type": "integer",
+ "description": "The amount of heap memory we are currently using"
+ },
+ "nonHeapMemoryUsage": {
+ "type": "integer",
+ "description": "The amount of non-heap memory we are using"
+ },
+ "name": {
+ "type": "string",
+ "description": "The name of this MBean"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
new file mode 100644
index 0000000..d599697
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
@@ -0,0 +1,38 @@
+{
+ "type" : "object",
+ "title" : "object",
+ "javaType": "org.apache.streams.pojo.json.StreamsTaskCounterBroadcast",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends" : {"$ref": "./Broadcast.json"},
+ "description" : "Snapshot of a Stream Task Counter",
+ "properties" : {
+ "errorRate": {
+ "type": "double",
+ "description": "Rate of failed items"
+ },
+ "numEmitted": {
+ "type": "integer",
+ "description": "Number of items that have been emitted"
+ },
+ "numReceived": {
+ "type": "integer",
+ "description": "Number of items this Task has received"
+ },
+ "numUnhandledErrors": {
+ "type": "integer",
+ "description": "Number of unhandled errors"
+ },
+ "avgTime": {
+ "type": "double",
+ "description": "Average amount of time an item spent in this Task"
+ },
+ "maxTime": {
+ "type": "integer",
+ "description": "Longest amount of time an item spent in this Task"
+ },
+ "name": {
+ "type": "string",
+ "description": "Name of the MBean"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
new file mode 100644
index 0000000..60796db
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
@@ -0,0 +1,38 @@
+{
+ "type" : "object",
+ "title" : "object",
+ "extends" : {"$ref": "./Broadcast.json"},
+ "javaType": "org.apache.streams.pojo.json.ThroughputQueueBroadcast",
+ "javaInterfaces": ["java.io.Serializable"],
+ "description" : "Snapshot of a ThroughputQueue's performance",
+ "properties" : {
+ "currentSize": {
+ "type": "integer",
+ "description": "Current size of the queue"
+ },
+ "avgWait": {
+ "type": "double",
+ "description": "Average wait time"
+ },
+ "maxWait": {
+ "type": "double",
+ "description": "Maximum wait time"
+ },
+ "removed": {
+ "type": "integer",
+ "description": "Number of elements removed from the queue"
+ },
+ "added": {
+ "type": "integer",
+ "description": "Number of elements added to the queue"
+ },
+ "throughput": {
+ "type": "double",
+ "description": "Number of elements that have passed through the queue per second"
+ },
+ "name": {
+ "type": "string",
+ "description": "Name of the MBean"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java b/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
new file mode 100644
index 0000000..c0ba7fb
--- /dev/null
+++ b/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.pojo.json.*;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static org.junit.Assert.assertNotNull;
+
+public class MemoryUsageDeserializerTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(MemoryUsageDeserializerTest.class);
+ private ObjectMapper objectMapper;
+
+ @Before
+ public void setup() {
+ objectMapper = new StreamsJacksonMapper();
+ SimpleModule simpleModule = new SimpleModule();
+ simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
+ objectMapper.registerModule(simpleModule);
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ @Test
+ public void serDeTest() {
+ InputStream is = MemoryUsageDeserializerTest.class.getResourceAsStream("/MemoryUsageObjects.json");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ if (!StringUtils.isEmpty(line)) {
+ LOGGER.info("raw: {}", line);
+ MemoryUsageBroadcast broadcast = objectMapper.readValue(line, MemoryUsageBroadcast.class);
+
+ LOGGER.info("activity: {}", broadcast);
+
+ assertNotNull(broadcast);
+ assertNotNull(broadcast.getVerbose());
+ assertNotNull(broadcast.getObjectPendingFinalizationCount());
+ assertNotNull(broadcast.getHeapMemoryUsage());
+ assertNotNull(broadcast.getNonHeapMemoryUsage());
+ assertNotNull(broadcast.getName());
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while testing serializability: {}", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/test/resources/MemoryUsageObjects.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/resources/MemoryUsageObjects.json b/streams-monitoring/src/test/resources/MemoryUsageObjects.json
new file mode 100644
index 0000000..f955fdc
--- /dev/null
+++ b/streams-monitoring/src/test/resources/MemoryUsageObjects.json
@@ -0,0 +1 @@
+{"canonicalName":"java.lang:type=Memory","pattern":false,"domainPattern":false,"propertyPattern":false,"propertyListPattern":false,"propertyValuePattern":false,"domain":"java.lang","keyPropertyList":{"type":"Memory"},"keyPropertyListString":"type=Memory","canonicalKeyPropertyListString":"type=Memory"}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
deleted file mode 100644
index 8bfa28b..0000000
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.jackson;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.DatumStatusCounterBroadcast;
-import org.slf4j.Logger;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-
-public class DatumStatusCounterDeserializer extends JsonDeserializer<DatumStatusCounterBroadcast> {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DatumStatusCounterDeserializer.class);
-
- public DatumStatusCounterDeserializer() {
-
- }
-
- @Override
- public DatumStatusCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
- try {
- MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
- DatumStatusCounterBroadcast datumStatusCounterBroadcast = new DatumStatusCounterBroadcast();
- JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
-
- ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
- MBeanInfo info = server.getMBeanInfo(name);
- datumStatusCounterBroadcast.setName(name.toString());
-
- for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
- try {
- switch(attribute.getName()) {
- case "Failed":
- datumStatusCounterBroadcast.setFailed((boolean) server.getAttribute(name, attribute.getName()));
- break;
- case "Passed":
- datumStatusCounterBroadcast.setPassed((boolean) server.getAttribute(name, attribute.getName()));
- break;
- }
- } catch (Exception e) {
- LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", e);
- }
- }
-
- return datumStatusCounterBroadcast;
- } catch (Exception e) {
- LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", e);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
deleted file mode 100644
index 43c9239..0000000
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.jackson;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.MemoryUsageBroadcast;
-import org.slf4j.Logger;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeDataSupport;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-
-public class MemoryUsageDeserializer extends JsonDeserializer<MemoryUsageBroadcast> {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MemoryUsageDeserializer.class);
-
- public MemoryUsageDeserializer() {
-
- }
-
- @Override
- public MemoryUsageBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
- try {
- MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
- MemoryUsageBroadcast memoryUsageBroadcast = new MemoryUsageBroadcast();
- JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
-
- ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
- MBeanInfo info = server.getMBeanInfo(name);
- memoryUsageBroadcast.setName(name.toString());
-
- for(MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
- switch(attribute.getName()) {
- case "Verbose":
- memoryUsageBroadcast.setVerbose((boolean) server.getAttribute(name, attribute.getName()));
- break;
- case "ObjectPendingFinalizationCount":
- memoryUsageBroadcast.setObjectPendingFinalizationCount(Long.parseLong(server.getAttribute(name, attribute.getName()).toString()));
- break;
- case "HeapMemoryUsage":
- memoryUsageBroadcast.setHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used"));
- break;
- case "NonHeapMemoryUsage":
- memoryUsageBroadcast.setNonHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used"));
- break;
- }
- }
-
- return memoryUsageBroadcast;
- } catch (Exception e) {
- LOGGER.error("Exception trying to deserialize MemoryUsageDeserializer object: {}", e);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
deleted file mode 100644
index 8b65bf3..0000000
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.jackson;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast;
-import org.slf4j.Logger;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-
-public class StreamsTaskCounterDeserializer extends JsonDeserializer<StreamsTaskCounterBroadcast> {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsTaskCounterDeserializer.class);
-
- public StreamsTaskCounterDeserializer() {
-
- }
-
- @Override
- public StreamsTaskCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
- try {
- MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
- StreamsTaskCounterBroadcast streamsTaskCounterBroadcast = new StreamsTaskCounterBroadcast();
- JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
-
- ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
- MBeanInfo info = server.getMBeanInfo(name);
- streamsTaskCounterBroadcast.setName(name.toString());
-
- for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
- try {
- switch (attribute.getName()) {
- case "ErrorRate":
- streamsTaskCounterBroadcast.setErrorRate((double) server.getAttribute(name, attribute.getName()));
- break;
- case "NumEmitted":
- streamsTaskCounterBroadcast.setNumEmitted((long) server.getAttribute(name, attribute.getName()));
- break;
- case "NumReceived":
- streamsTaskCounterBroadcast.setNumReceived((long) server.getAttribute(name, attribute.getName()));
- break;
- case "NumUnhandledErrors":
- streamsTaskCounterBroadcast.setNumUnhandledErrors((long) server.getAttribute(name, attribute.getName()));
- break;
- case "AvgTime":
- streamsTaskCounterBroadcast.setAvgTime((double) server.getAttribute(name, attribute.getName()));
- break;
- case "MaxTime":
- streamsTaskCounterBroadcast.setMaxTime((long) server.getAttribute(name, attribute.getName()));
- break;
- }
- } catch (Exception e) {
- LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", e);
- }
- }
-
- return streamsTaskCounterBroadcast;
- } catch (Exception e) {
- LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", e);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
deleted file mode 100644
index e4d883d..0000000
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.jackson;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
-import org.slf4j.Logger;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-
-public class ThroughputQueueDeserializer extends JsonDeserializer<ThroughputQueueBroadcast> {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThroughputQueueDeserializer.class);
-
- public ThroughputQueueDeserializer() {
-
- }
-
- @Override
- public ThroughputQueueBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
- try {
- MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
- ThroughputQueueBroadcast throughputQueueBroadcast = new ThroughputQueueBroadcast();
- JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
-
- ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
- MBeanInfo info = server.getMBeanInfo(name);
- throughputQueueBroadcast.setName(name.toString());
-
- for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
- try {
- switch(attribute.getName()) {
- case "CurrentSize":
- throughputQueueBroadcast.setCurrentSize((long) server.getAttribute(name, attribute.getName()));
- break;
- case "AvgWait":
- throughputQueueBroadcast.setAvgWait((double) server.getAttribute(name, attribute.getName()));
- break;
- case "MaxWait":
- throughputQueueBroadcast.setMaxWait((long) server.getAttribute(name, attribute.getName()));
- break;
- case "Removed":
- throughputQueueBroadcast.setRemoved((long) server.getAttribute(name, attribute.getName()));
- break;
- case "Added":
- throughputQueueBroadcast.setAdded((long) server.getAttribute(name, attribute.getName()));
- break;
- case "Throughput":
- throughputQueueBroadcast.setThroughput((double) server.getAttribute(name, attribute.getName()));
- break;
- }
- } catch (Exception e) {
- LOGGER.error("Exception while trying to deserialize ThroughputQueueBroadcast object: {}", e);
- }
- }
-
- return throughputQueueBroadcast;
- } catch (Exception e) {
- return null;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
deleted file mode 100644
index 4d7f87b..0000000
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
- "type" : "object",
- "title" : "object",
- "javaType": "org.apache.streams.pojo.json.Broadcast",
- "javaInterfaces": ["java.io.Serializable"],
- "description" : "Base Broadcast class",
- "properties" : {
- "name": {
- "type": "string",
- "description": "Name of the MBean"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
deleted file mode 100644
index ac86d21..0000000
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
- "type" : "object",
- "title" : "object",
- "extends" : {"$ref": "./Broadcast.json"},
- "javaType": "org.apache.streams.pojo.json.DatumStatusCounterBroadcast",
- "javaInterfaces": ["java.io.Serializable"],
- "description" : "Snapshot of the DatumStatusCounter",
- "properties" : {
- "passed": {
- "type": "boolean",
- "description": "Number of objects that have passed"
- },
- "failed": {
- "type": "boolean",
- "description": "Number of objects that have faile"
- },
- "name": {
- "type": "string",
- "description": "Name of the MBean"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
deleted file mode 100644
index 4a17d41..0000000
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
- "type" : "object",
- "title" : "object",
- "extends" : {"$ref": "./Broadcast.json"},
- "javaType": "org.apache.streams.pojo.json.MemoryUsageBroadcast",
- "javaInterfaces": ["java.io.Serializable"],
- "description" : "Snapshot of current memory usage",
- "properties" : {
- "verbose" : {
- "type" : "boolean",
- "description": "Whether or not this is verbose"
- },
- "objectPendingFinalizationCount": {
- "type": "integer",
- "description": "The number of objects that are pending finalization"
- },
- "heapMemoryUsage": {
- "type": "integer",
- "description": "The amount of heap memory we are currently using"
- },
- "nonHeapMemoryUsage": {
- "type": "integer",
- "description": "The amount of non-heap memory we are using"
- },
- "name": {
- "type": "string",
- "description": "The name of this MBean"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
deleted file mode 100644
index d599697..0000000
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
+++ /dev/null
@@ -1,38 +0,0 @@
-{
- "type" : "object",
- "title" : "object",
- "javaType": "org.apache.streams.pojo.json.StreamsTaskCounterBroadcast",
- "javaInterfaces": ["java.io.Serializable"],
- "extends" : {"$ref": "./Broadcast.json"},
- "description" : "Snapshot of a Stream Task Counter",
- "properties" : {
- "errorRate": {
- "type": "double",
- "description": "Rate of failed items"
- },
- "numEmitted": {
- "type": "integer",
- "description": "Number of items that have been emitted"
- },
- "numReceived": {
- "type": "integer",
- "description": "Number of items this Task has received"
- },
- "numUnhandledErrors": {
- "type": "integer",
- "description": "Number of unhandled errors"
- },
- "avgTime": {
- "type": "double",
- "description": "Average amount of time an item spent in this Task"
- },
- "maxTime": {
- "type": "integer",
- "description": "Longest amount of time an item spent in this Task"
- },
- "name": {
- "type": "string",
- "description": "Name of the MBean"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
deleted file mode 100644
index 60796db..0000000
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
+++ /dev/null
@@ -1,38 +0,0 @@
-{
- "type" : "object",
- "title" : "object",
- "extends" : {"$ref": "./Broadcast.json"},
- "javaType": "org.apache.streams.pojo.json.ThroughputQueueBroadcast",
- "javaInterfaces": ["java.io.Serializable"],
- "description" : "Snapshot of a ThroughputQueue's performance",
- "properties" : {
- "currentSize": {
- "type": "integer",
- "description": "Current size of the queue"
- },
- "avgWait": {
- "type": "double",
- "description": "Average wait time"
- },
- "maxWait": {
- "type": "double",
- "description": "Maximum wait time"
- },
- "removed": {
- "type": "integer",
- "description": "Number of elements removed from the queue"
- },
- "added": {
- "type": "integer",
- "description": "Number of elements added to the queue"
- },
- "throughput": {
- "type": "double",
- "description": "Number of elements that have passed through the queue per second"
- },
- "name": {
- "type": "string",
- "description": "Name of the MBean"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java b/streams-pojo/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
deleted file mode 100644
index c0ba7fb..0000000
--- a/streams-pojo/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.jackson;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.pojo.json.*;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import static org.junit.Assert.assertNotNull;
-
-public class MemoryUsageDeserializerTest {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(MemoryUsageDeserializerTest.class);
- private ObjectMapper objectMapper;
-
- @Before
- public void setup() {
- objectMapper = new StreamsJacksonMapper();
- SimpleModule simpleModule = new SimpleModule();
- simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
- objectMapper.registerModule(simpleModule);
- objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
-
- @Test
- public void serDeTest() {
- InputStream is = MemoryUsageDeserializerTest.class.getResourceAsStream("/MemoryUsageObjects.json");
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
-
- try {
- while (br.ready()) {
- String line = br.readLine();
- if (!StringUtils.isEmpty(line)) {
- LOGGER.info("raw: {}", line);
- MemoryUsageBroadcast broadcast = objectMapper.readValue(line, MemoryUsageBroadcast.class);
-
- LOGGER.info("activity: {}", broadcast);
-
- assertNotNull(broadcast);
- assertNotNull(broadcast.getVerbose());
- assertNotNull(broadcast.getObjectPendingFinalizationCount());
- assertNotNull(broadcast.getHeapMemoryUsage());
- assertNotNull(broadcast.getNonHeapMemoryUsage());
- assertNotNull(broadcast.getName());
- }
- }
- } catch (Exception e) {
- LOGGER.error("Exception while testing serializability: {}", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/test/resources/MemoryUsageObjects.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/test/resources/MemoryUsageObjects.json b/streams-pojo/src/test/resources/MemoryUsageObjects.json
deleted file mode 100644
index f955fdc..0000000
--- a/streams-pojo/src/test/resources/MemoryUsageObjects.json
+++ /dev/null
@@ -1 +0,0 @@
-{"canonicalName":"java.lang:type=Memory","pattern":false,"domainPattern":false,"propertyPattern":false,"propertyListPattern":false,"propertyValuePattern":false,"domain":"java.lang","keyPropertyList":{"type":"Memory"},"keyPropertyListString":"type=Memory","canonicalKeyPropertyListString":"type=Memory"}
\ No newline at end of file