You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/13 06:24:25 UTC

[3/6] incubator-streams git commit: Merge

Merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/356f6472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/356f6472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/356f6472

Branch: refs/heads/master
Commit: 356f647200c4304e27a92b8b2051c84da47b31af
Parents: b03b1b4
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Wed Nov 12 17:38:40 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Wed Nov 12 17:38:40 2014 -0600

----------------------------------------------------------------------
 .../jackson/DatumStatusCounterDeserializer.java | 76 +++++++++++++++++
 .../jackson/MemoryUsageDeserializer.java        | 79 ++++++++++++++++++
 .../jackson/StreamsTaskCounterDeserializer.java | 88 ++++++++++++++++++++
 .../jackson/ThroughputQueueDeserializer.java    | 87 +++++++++++++++++++
 .../org/apache/streams/pojo/json/Broadcast.json | 13 +++
 .../pojo/json/DatumStatusCounterBroadcast.json  | 22 +++++
 .../streams/pojo/json/MemoryUsageBroadcast.json | 30 +++++++
 .../pojo/json/StreamsTaskCounterBroadcast.json  | 38 +++++++++
 .../pojo/json/ThroughputQueueBroadcast.json     | 38 +++++++++
 .../jackson/MemoryUsageDeserializerTest.java    | 77 +++++++++++++++++
 .../src/test/resources/MemoryUsageObjects.json  |  1 +
 .../jackson/DatumStatusCounterDeserializer.java | 76 -----------------
 .../jackson/MemoryUsageDeserializer.java        | 79 ------------------
 .../jackson/StreamsTaskCounterDeserializer.java | 88 --------------------
 .../jackson/ThroughputQueueDeserializer.java    | 87 -------------------
 .../org/apache/streams/pojo/json/Broadcast.json | 13 ---
 .../pojo/json/DatumStatusCounterBroadcast.json  | 22 -----
 .../streams/pojo/json/MemoryUsageBroadcast.json | 30 -------
 .../pojo/json/StreamsTaskCounterBroadcast.json  | 38 ---------
 .../pojo/json/ThroughputQueueBroadcast.json     | 38 ---------
 .../jackson/MemoryUsageDeserializerTest.java    | 77 -----------------
 .../src/test/resources/MemoryUsageObjects.json  |  1 -
 22 files changed, 549 insertions(+), 549 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
new file mode 100644
index 0000000..8bfa28b
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.streams.pojo.json.DatumStatusCounterBroadcast;
+import org.slf4j.Logger;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+
+public class DatumStatusCounterDeserializer extends JsonDeserializer<DatumStatusCounterBroadcast> {
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DatumStatusCounterDeserializer.class);
+
+    public DatumStatusCounterDeserializer() {
+
+    }
+
+    @Override
+    public DatumStatusCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+        try {
+            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+            DatumStatusCounterBroadcast datumStatusCounterBroadcast = new DatumStatusCounterBroadcast();
+            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+
+            ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
+            MBeanInfo info = server.getMBeanInfo(name);
+            datumStatusCounterBroadcast.setName(name.toString());
+
+            for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
+                try {
+                    switch(attribute.getName()) {
+                        case "Failed":
+                            datumStatusCounterBroadcast.setFailed((boolean) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "Passed":
+                            datumStatusCounterBroadcast.setPassed((boolean) server.getAttribute(name, attribute.getName()));
+                            break;
+                    }
+                } catch (Exception e) {
+                    LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", e);
+                }
+            }
+
+            return datumStatusCounterBroadcast;
+        } catch (Exception e) {
+            LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", e);
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
new file mode 100644
index 0000000..43c9239
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.streams.pojo.json.MemoryUsageBroadcast;
+import org.slf4j.Logger;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeDataSupport;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+
+public class MemoryUsageDeserializer extends JsonDeserializer<MemoryUsageBroadcast> {
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MemoryUsageDeserializer.class);
+
+    public MemoryUsageDeserializer() {
+
+    }
+
+    @Override
+    public MemoryUsageBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+        try {
+            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+            MemoryUsageBroadcast memoryUsageBroadcast = new MemoryUsageBroadcast();
+            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+
+            ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
+            MBeanInfo info = server.getMBeanInfo(name);
+            memoryUsageBroadcast.setName(name.toString());
+
+            for(MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
+                switch(attribute.getName()) {
+                    case "Verbose":
+                        memoryUsageBroadcast.setVerbose((boolean) server.getAttribute(name, attribute.getName()));
+                        break;
+                    case "ObjectPendingFinalizationCount":
+                        memoryUsageBroadcast.setObjectPendingFinalizationCount(Long.parseLong(server.getAttribute(name, attribute.getName()).toString()));
+                        break;
+                    case "HeapMemoryUsage":
+                        memoryUsageBroadcast.setHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used"));
+                        break;
+                    case "NonHeapMemoryUsage":
+                        memoryUsageBroadcast.setNonHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used"));
+                        break;
+                }
+            }
+
+            return memoryUsageBroadcast;
+        } catch (Exception e) {
+            LOGGER.error("Exception trying to deserialize MemoryUsageDeserializer object: {}", e);
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
new file mode 100644
index 0000000..8b65bf3
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast;
+import org.slf4j.Logger;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+
+public class StreamsTaskCounterDeserializer extends JsonDeserializer<StreamsTaskCounterBroadcast> {
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsTaskCounterDeserializer.class);
+
+    public StreamsTaskCounterDeserializer() {
+
+    }
+
+    @Override
+    public StreamsTaskCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+        try {
+            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+            StreamsTaskCounterBroadcast streamsTaskCounterBroadcast = new StreamsTaskCounterBroadcast();
+            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+
+            ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
+            MBeanInfo info = server.getMBeanInfo(name);
+            streamsTaskCounterBroadcast.setName(name.toString());
+
+            for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
+                try {
+                    switch (attribute.getName()) {
+                        case "ErrorRate":
+                            streamsTaskCounterBroadcast.setErrorRate((double) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "NumEmitted":
+                            streamsTaskCounterBroadcast.setNumEmitted((long) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "NumReceived":
+                            streamsTaskCounterBroadcast.setNumReceived((long) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "NumUnhandledErrors":
+                            streamsTaskCounterBroadcast.setNumUnhandledErrors((long) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "AvgTime":
+                            streamsTaskCounterBroadcast.setAvgTime((double) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "MaxTime":
+                            streamsTaskCounterBroadcast.setMaxTime((long) server.getAttribute(name, attribute.getName()));
+                            break;
+                    }
+                } catch (Exception e) {
+                    LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", e);
+                }
+            }
+
+            return streamsTaskCounterBroadcast;
+        } catch (Exception e) {
+            LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", e);
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
new file mode 100644
index 0000000..e4d883d
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
+import org.slf4j.Logger;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+
+public class ThroughputQueueDeserializer extends JsonDeserializer<ThroughputQueueBroadcast> {
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThroughputQueueDeserializer.class);
+
+    public ThroughputQueueDeserializer() {
+
+    }
+
+    @Override
+    public ThroughputQueueBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+        try {
+            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+            ThroughputQueueBroadcast throughputQueueBroadcast = new ThroughputQueueBroadcast();
+            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+
+            ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
+            MBeanInfo info = server.getMBeanInfo(name);
+            throughputQueueBroadcast.setName(name.toString());
+
+            for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
+                try {
+                    switch(attribute.getName()) {
+                        case "CurrentSize":
+                            throughputQueueBroadcast.setCurrentSize((long) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "AvgWait":
+                            throughputQueueBroadcast.setAvgWait((double) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "MaxWait":
+                            throughputQueueBroadcast.setMaxWait((long) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "Removed":
+                            throughputQueueBroadcast.setRemoved((long) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "Added":
+                            throughputQueueBroadcast.setAdded((long) server.getAttribute(name, attribute.getName()));
+                            break;
+                        case "Throughput":
+                            throughputQueueBroadcast.setThroughput((double) server.getAttribute(name, attribute.getName()));
+                            break;
+                    }
+                } catch (Exception e) {
+                    LOGGER.error("Exception while trying to deserialize ThroughputQueueBroadcast object: {}", e);
+                }
+            }
+
+            return throughputQueueBroadcast;
+        } catch (Exception e) {
+            return null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
new file mode 100644
index 0000000..4d7f87b
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
@@ -0,0 +1,13 @@
+{
+  "type" : "object",
+  "title" : "object",
+  "javaType": "org.apache.streams.pojo.json.Broadcast",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description" : "Base Broadcast class",
+  "properties" : {
+    "name": {
+      "type": "string",
+      "description": "Name of the MBean"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
new file mode 100644
index 0000000..ac86d21
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
@@ -0,0 +1,22 @@
+{
+  "type" : "object",
+  "title" : "object",
+  "extends" : {"$ref": "./Broadcast.json"},
+  "javaType": "org.apache.streams.pojo.json.DatumStatusCounterBroadcast",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description" : "Snapshot of the DatumStatusCounter",
+  "properties" : {
+    "passed": {
+      "type": "boolean",
+      "description": "Number of objects that have passed"
+    },
+    "failed": {
+      "type": "boolean",
+      "description": "Number of objects that have faile"
+    },
+    "name": {
+      "type": "string",
+      "description": "Name of the MBean"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
new file mode 100644
index 0000000..4a17d41
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
@@ -0,0 +1,30 @@
+{
+  "type" : "object",
+  "title" : "object",
+  "extends" : {"$ref": "./Broadcast.json"},
+  "javaType": "org.apache.streams.pojo.json.MemoryUsageBroadcast",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description" : "Snapshot of current memory usage",
+  "properties" : {
+    "verbose" : {
+      "type" : "boolean",
+      "description": "Whether or not this is verbose"
+    },
+    "objectPendingFinalizationCount": {
+      "type": "integer",
+      "description": "The number of objects that are pending finalization"
+    },
+    "heapMemoryUsage": {
+      "type": "integer",
+      "description": "The amount of heap memory we are currently using"
+    },
+    "nonHeapMemoryUsage": {
+      "type": "integer",
+      "description": "The amount of non-heap memory we are using"
+    },
+    "name": {
+      "type": "string",
+      "description": "The name of this MBean"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
new file mode 100644
index 0000000..d599697
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
@@ -0,0 +1,38 @@
+{
+  "type" : "object",
+  "title" : "object",
+  "javaType": "org.apache.streams.pojo.json.StreamsTaskCounterBroadcast",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends" : {"$ref": "./Broadcast.json"},
+  "description" : "Snapshot of a Stream Task Counter",
+  "properties" : {
+    "errorRate": {
+      "type": "double",
+      "description": "Rate of failed items"
+    },
+    "numEmitted": {
+      "type": "integer",
+      "description": "Number of items that have been emitted"
+    },
+    "numReceived": {
+      "type": "integer",
+      "description": "Number of items this Task has received"
+    },
+    "numUnhandledErrors": {
+      "type": "integer",
+      "description": "Number of unhandled errors"
+    },
+    "avgTime": {
+      "type": "double",
+      "description": "Average amount of time an item spent in this Task"
+    },
+    "maxTime": {
+      "type": "integer",
+      "description": "Longest amount of time an item spent in this Task"
+    },
+    "name": {
+      "type": "string",
+      "description": "Name of the MBean"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
new file mode 100644
index 0000000..60796db
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
@@ -0,0 +1,38 @@
+{
+  "type" : "object",
+  "title" : "object",
+  "extends" : {"$ref": "./Broadcast.json"},
+  "javaType": "org.apache.streams.pojo.json.ThroughputQueueBroadcast",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description" : "Snapshot of a ThroughputQueue's performance",
+  "properties" : {
+    "currentSize": {
+      "type": "integer",
+      "description": "Current size of the queue"
+    },
+    "avgWait": {
+      "type": "double",
+      "description": "Average wait time"
+    },
+    "maxWait": {
+      "type": "double",
+      "description": "Maximum wait time"
+    },
+    "removed": {
+      "type": "integer",
+      "description": "Number of elements removed from the queue"
+    },
+    "added": {
+      "type": "integer",
+      "description": "Number of elements added to the queue"
+    },
+    "throughput": {
+      "type": "double",
+      "description": "Number of elements that have passed through the queue per second"
+    },
+    "name": {
+      "type": "string",
+      "description": "Name of the MBean"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java b/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
new file mode 100644
index 0000000..c0ba7fb
--- /dev/null
+++ b/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.pojo.json.*;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static org.junit.Assert.assertNotNull;
+
+public class MemoryUsageDeserializerTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MemoryUsageDeserializerTest.class);
+    private ObjectMapper objectMapper;
+
+    @Before
+    public void setup() {
+        objectMapper = new StreamsJacksonMapper();
+        SimpleModule simpleModule = new SimpleModule();
+        simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
+        objectMapper.registerModule(simpleModule);
+        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @Test
+    public void serDeTest() {
+        InputStream is = MemoryUsageDeserializerTest.class.getResourceAsStream("/MemoryUsageObjects.json");
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        try {
+            while (br.ready()) {
+                String line = br.readLine();
+                if (!StringUtils.isEmpty(line)) {
+                    LOGGER.info("raw: {}", line);
+                    MemoryUsageBroadcast broadcast = objectMapper.readValue(line, MemoryUsageBroadcast.class);
+
+                    LOGGER.info("activity: {}", broadcast);
+
+                    assertNotNull(broadcast);
+                    assertNotNull(broadcast.getVerbose());
+                    assertNotNull(broadcast.getObjectPendingFinalizationCount());
+                    assertNotNull(broadcast.getHeapMemoryUsage());
+                    assertNotNull(broadcast.getNonHeapMemoryUsage());
+                    assertNotNull(broadcast.getName());
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("Exception while testing serializability: {}", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-monitoring/src/test/resources/MemoryUsageObjects.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/resources/MemoryUsageObjects.json b/streams-monitoring/src/test/resources/MemoryUsageObjects.json
new file mode 100644
index 0000000..f955fdc
--- /dev/null
+++ b/streams-monitoring/src/test/resources/MemoryUsageObjects.json
@@ -0,0 +1 @@
+{"canonicalName":"java.lang:type=Memory","pattern":false,"domainPattern":false,"propertyPattern":false,"propertyListPattern":false,"propertyValuePattern":false,"domain":"java.lang","keyPropertyList":{"type":"Memory"},"keyPropertyListString":"type=Memory","canonicalKeyPropertyListString":"type=Memory"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
deleted file mode 100644
index 8bfa28b..0000000
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.jackson;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.DatumStatusCounterBroadcast;
-import org.slf4j.Logger;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-
-public class DatumStatusCounterDeserializer extends JsonDeserializer<DatumStatusCounterBroadcast> {
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DatumStatusCounterDeserializer.class);
-
-    public DatumStatusCounterDeserializer() {
-
-    }
-
-    @Override
-    public DatumStatusCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-        try {
-            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
-            DatumStatusCounterBroadcast datumStatusCounterBroadcast = new DatumStatusCounterBroadcast();
-            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
-
-            ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
-            MBeanInfo info = server.getMBeanInfo(name);
-            datumStatusCounterBroadcast.setName(name.toString());
-
-            for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
-                try {
-                    switch(attribute.getName()) {
-                        case "Failed":
-                            datumStatusCounterBroadcast.setFailed((boolean) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "Passed":
-                            datumStatusCounterBroadcast.setPassed((boolean) server.getAttribute(name, attribute.getName()));
-                            break;
-                    }
-                } catch (Exception e) {
-                    LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", e);
-                }
-            }
-
-            return datumStatusCounterBroadcast;
-        } catch (Exception e) {
-            LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", e);
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
deleted file mode 100644
index 43c9239..0000000
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.jackson;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.MemoryUsageBroadcast;
-import org.slf4j.Logger;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeDataSupport;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-
-public class MemoryUsageDeserializer extends JsonDeserializer<MemoryUsageBroadcast> {
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MemoryUsageDeserializer.class);
-
-    public MemoryUsageDeserializer() {
-
-    }
-
-    @Override
-    public MemoryUsageBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-        try {
-            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
-            MemoryUsageBroadcast memoryUsageBroadcast = new MemoryUsageBroadcast();
-            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
-
-            ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
-            MBeanInfo info = server.getMBeanInfo(name);
-            memoryUsageBroadcast.setName(name.toString());
-
-            for(MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
-                switch(attribute.getName()) {
-                    case "Verbose":
-                        memoryUsageBroadcast.setVerbose((boolean) server.getAttribute(name, attribute.getName()));
-                        break;
-                    case "ObjectPendingFinalizationCount":
-                        memoryUsageBroadcast.setObjectPendingFinalizationCount(Long.parseLong(server.getAttribute(name, attribute.getName()).toString()));
-                        break;
-                    case "HeapMemoryUsage":
-                        memoryUsageBroadcast.setHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used"));
-                        break;
-                    case "NonHeapMemoryUsage":
-                        memoryUsageBroadcast.setNonHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used"));
-                        break;
-                }
-            }
-
-            return memoryUsageBroadcast;
-        } catch (Exception e) {
-            LOGGER.error("Exception trying to deserialize MemoryUsageDeserializer object: {}", e);
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
deleted file mode 100644
index 8b65bf3..0000000
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.jackson;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast;
-import org.slf4j.Logger;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-
-public class StreamsTaskCounterDeserializer extends JsonDeserializer<StreamsTaskCounterBroadcast> {
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsTaskCounterDeserializer.class);
-
-    public StreamsTaskCounterDeserializer() {
-
-    }
-
-    @Override
-    public StreamsTaskCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-        try {
-            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
-            StreamsTaskCounterBroadcast streamsTaskCounterBroadcast = new StreamsTaskCounterBroadcast();
-            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
-
-            ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
-            MBeanInfo info = server.getMBeanInfo(name);
-            streamsTaskCounterBroadcast.setName(name.toString());
-
-            for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
-                try {
-                    switch (attribute.getName()) {
-                        case "ErrorRate":
-                            streamsTaskCounterBroadcast.setErrorRate((double) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "NumEmitted":
-                            streamsTaskCounterBroadcast.setNumEmitted((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "NumReceived":
-                            streamsTaskCounterBroadcast.setNumReceived((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "NumUnhandledErrors":
-                            streamsTaskCounterBroadcast.setNumUnhandledErrors((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "AvgTime":
-                            streamsTaskCounterBroadcast.setAvgTime((double) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "MaxTime":
-                            streamsTaskCounterBroadcast.setMaxTime((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                    }
-                } catch (Exception e) {
-                    LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", e);
-                }
-            }
-
-            return streamsTaskCounterBroadcast;
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", e);
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
deleted file mode 100644
index e4d883d..0000000
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.jackson;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
-import org.slf4j.Logger;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-
-public class ThroughputQueueDeserializer extends JsonDeserializer<ThroughputQueueBroadcast> {
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThroughputQueueDeserializer.class);
-
-    public ThroughputQueueDeserializer() {
-
-    }
-
-    @Override
-    public ThroughputQueueBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-        try {
-            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
-            ThroughputQueueBroadcast throughputQueueBroadcast = new ThroughputQueueBroadcast();
-            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
-
-            ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
-            MBeanInfo info = server.getMBeanInfo(name);
-            throughputQueueBroadcast.setName(name.toString());
-
-            for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
-                try {
-                    switch(attribute.getName()) {
-                        case "CurrentSize":
-                            throughputQueueBroadcast.setCurrentSize((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "AvgWait":
-                            throughputQueueBroadcast.setAvgWait((double) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "MaxWait":
-                            throughputQueueBroadcast.setMaxWait((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "Removed":
-                            throughputQueueBroadcast.setRemoved((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "Added":
-                            throughputQueueBroadcast.setAdded((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "Throughput":
-                            throughputQueueBroadcast.setThroughput((double) server.getAttribute(name, attribute.getName()));
-                            break;
-                    }
-                } catch (Exception e) {
-                    LOGGER.error("Exception while trying to deserialize ThroughputQueueBroadcast object: {}", e);
-                }
-            }
-
-            return throughputQueueBroadcast;
-        } catch (Exception e) {
-            return null;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
deleted file mode 100644
index 4d7f87b..0000000
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
-  "type" : "object",
-  "title" : "object",
-  "javaType": "org.apache.streams.pojo.json.Broadcast",
-  "javaInterfaces": ["java.io.Serializable"],
-  "description" : "Base Broadcast class",
-  "properties" : {
-    "name": {
-      "type": "string",
-      "description": "Name of the MBean"
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
deleted file mode 100644
index ac86d21..0000000
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
-  "type" : "object",
-  "title" : "object",
-  "extends" : {"$ref": "./Broadcast.json"},
-  "javaType": "org.apache.streams.pojo.json.DatumStatusCounterBroadcast",
-  "javaInterfaces": ["java.io.Serializable"],
-  "description" : "Snapshot of the DatumStatusCounter",
-  "properties" : {
-    "passed": {
-      "type": "boolean",
-      "description": "Number of objects that have passed"
-    },
-    "failed": {
-      "type": "boolean",
-      "description": "Number of objects that have faile"
-    },
-    "name": {
-      "type": "string",
-      "description": "Name of the MBean"
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
deleted file mode 100644
index 4a17d41..0000000
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
-  "type" : "object",
-  "title" : "object",
-  "extends" : {"$ref": "./Broadcast.json"},
-  "javaType": "org.apache.streams.pojo.json.MemoryUsageBroadcast",
-  "javaInterfaces": ["java.io.Serializable"],
-  "description" : "Snapshot of current memory usage",
-  "properties" : {
-    "verbose" : {
-      "type" : "boolean",
-      "description": "Whether or not this is verbose"
-    },
-    "objectPendingFinalizationCount": {
-      "type": "integer",
-      "description": "The number of objects that are pending finalization"
-    },
-    "heapMemoryUsage": {
-      "type": "integer",
-      "description": "The amount of heap memory we are currently using"
-    },
-    "nonHeapMemoryUsage": {
-      "type": "integer",
-      "description": "The amount of non-heap memory we are using"
-    },
-    "name": {
-      "type": "string",
-      "description": "The name of this MBean"
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
deleted file mode 100644
index d599697..0000000
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json
+++ /dev/null
@@ -1,38 +0,0 @@
-{
-  "type" : "object",
-  "title" : "object",
-  "javaType": "org.apache.streams.pojo.json.StreamsTaskCounterBroadcast",
-  "javaInterfaces": ["java.io.Serializable"],
-  "extends" : {"$ref": "./Broadcast.json"},
-  "description" : "Snapshot of a Stream Task Counter",
-  "properties" : {
-    "errorRate": {
-      "type": "double",
-      "description": "Rate of failed items"
-    },
-    "numEmitted": {
-      "type": "integer",
-      "description": "Number of items that have been emitted"
-    },
-    "numReceived": {
-      "type": "integer",
-      "description": "Number of items this Task has received"
-    },
-    "numUnhandledErrors": {
-      "type": "integer",
-      "description": "Number of unhandled errors"
-    },
-    "avgTime": {
-      "type": "double",
-      "description": "Average amount of time an item spent in this Task"
-    },
-    "maxTime": {
-      "type": "integer",
-      "description": "Longest amount of time an item spent in this Task"
-    },
-    "name": {
-      "type": "string",
-      "description": "Name of the MBean"
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
deleted file mode 100644
index 60796db..0000000
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json
+++ /dev/null
@@ -1,38 +0,0 @@
-{
-  "type" : "object",
-  "title" : "object",
-  "extends" : {"$ref": "./Broadcast.json"},
-  "javaType": "org.apache.streams.pojo.json.ThroughputQueueBroadcast",
-  "javaInterfaces": ["java.io.Serializable"],
-  "description" : "Snapshot of a ThroughputQueue's performance",
-  "properties" : {
-    "currentSize": {
-      "type": "integer",
-      "description": "Current size of the queue"
-    },
-    "avgWait": {
-      "type": "double",
-      "description": "Average wait time"
-    },
-    "maxWait": {
-      "type": "double",
-      "description": "Maximum wait time"
-    },
-    "removed": {
-      "type": "integer",
-      "description": "Number of elements removed from the queue"
-    },
-    "added": {
-      "type": "integer",
-      "description": "Number of elements added to the queue"
-    },
-    "throughput": {
-      "type": "double",
-      "description": "Number of elements that have passed through the queue per second"
-    },
-    "name": {
-      "type": "string",
-      "description": "Name of the MBean"
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java b/streams-pojo/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
deleted file mode 100644
index c0ba7fb..0000000
--- a/streams-pojo/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.jackson;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.pojo.json.*;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import static org.junit.Assert.assertNotNull;
-
-public class MemoryUsageDeserializerTest {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(MemoryUsageDeserializerTest.class);
-    private ObjectMapper objectMapper;
-
-    @Before
-    public void setup() {
-        objectMapper = new StreamsJacksonMapper();
-        SimpleModule simpleModule = new SimpleModule();
-        simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
-        objectMapper.registerModule(simpleModule);
-        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-    }
-
-    @Test
-    public void serDeTest() {
-        InputStream is = MemoryUsageDeserializerTest.class.getResourceAsStream("/MemoryUsageObjects.json");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                if (!StringUtils.isEmpty(line)) {
-                    LOGGER.info("raw: {}", line);
-                    MemoryUsageBroadcast broadcast = objectMapper.readValue(line, MemoryUsageBroadcast.class);
-
-                    LOGGER.info("activity: {}", broadcast);
-
-                    assertNotNull(broadcast);
-                    assertNotNull(broadcast.getVerbose());
-                    assertNotNull(broadcast.getObjectPendingFinalizationCount());
-                    assertNotNull(broadcast.getHeapMemoryUsage());
-                    assertNotNull(broadcast.getNonHeapMemoryUsage());
-                    assertNotNull(broadcast.getName());
-                }
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception while testing serializability: {}", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/356f6472/streams-pojo/src/test/resources/MemoryUsageObjects.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/test/resources/MemoryUsageObjects.json b/streams-pojo/src/test/resources/MemoryUsageObjects.json
deleted file mode 100644
index f955fdc..0000000
--- a/streams-pojo/src/test/resources/MemoryUsageObjects.json
+++ /dev/null
@@ -1 +0,0 @@
-{"canonicalName":"java.lang:type=Memory","pattern":false,"domainPattern":false,"propertyPattern":false,"propertyListPattern":false,"propertyValuePattern":false,"domain":"java.lang","keyPropertyList":{"type":"Memory"},"keyPropertyListString":"type=Memory","canonicalKeyPropertyListString":"type=Memory"}
\ No newline at end of file