You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/11/09 17:43:02 UTC

kafka git commit: MINOR: Add HttpMetricsReporter for system tests

Repository: kafka
Updated Branches:
  refs/heads/trunk 0653a895f -> 718dda114


MINOR: Add HttpMetricsReporter for system tests

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #4072 from ewencp/http-metrics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/718dda11
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/718dda11
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/718dda11

Branch: refs/heads/trunk
Commit: 718dda1144629d824f4bdb8ff73fbd531a22723a
Parents: 0653a89
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Nov 9 09:42:46 2017 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Nov 9 09:42:46 2017 -0800

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 build.gradle                                    |   3 +
 .../kafka/common/metrics/KafkaMetric.java       |   3 +-
 tests/README.md                                 |  13 +-
 tests/docker/ducker-ak                          |   3 +-
 tests/kafkatest/directory_layout/kafka_path.py  |  16 +-
 tests/kafkatest/services/monitor/http.py        | 226 +++++++++++++
 .../services/performance/performance.py         |   2 +-
 .../performance/producer_performance.py         |  36 +-
 tests/kafkatest/tests/client/quota_test.py      |   8 +-
 tests/kafkatest/tests/core/throttling_test.py   |  10 +-
 .../kafka/tools/PushHttpMetricsReporter.java    | 319 ++++++++++++++++++
 .../tools/PushHttpMetricsReporterTest.java      | 333 +++++++++++++++++++
 13 files changed, 924 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 9be5c58..8a22c9b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -39,7 +39,7 @@ before_install:
 
 script:
   - ./gradlew rat
-  - ./gradlew releaseTarGz && /bin/bash ./tests/docker/run_tests.sh
+  - ./gradlew systemTestLibs && /bin/bash ./tests/docker/run_tests.sh
 
 services:
   - docker

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 9a221d9..abd73bb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -861,6 +861,9 @@ project(':tools') {
     testCompile project(':clients')
     testCompile libs.junit
     testCompile project(':clients').sourceSets.test.output
+    testCompile libs.easymock
+    testCompile libs.powermockJunit4
+    testCompile libs.powermockEasymock
   }
 
   javadoc {

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index 37c2b1b..f04981a 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -28,7 +28,8 @@ public final class KafkaMetric implements Metric {
     private final MetricValueProvider<?> metricValueProvider;
     private MetricConfig config;
 
-    KafkaMetric(Object lock, MetricName metricName, MetricValueProvider<?> valueProvider,
+    // public for testing
+    public KafkaMetric(Object lock, MetricName metricName, MetricValueProvider<?> valueProvider,
             MetricConfig config, Time time) {
         this.metricName = metricName;
         this.lock = lock;

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/README.md
----------------------------------------------------------------------
diff --git a/tests/README.md b/tests/README.md
index cecb990..f0ffdf5 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -11,8 +11,7 @@ Running tests using docker
 Docker containers can be used for running kafka system tests locally.
 * Requirements
   - Docker 1.12.3 (or higher) is installed and running on the machine.
-  - Test require a single kafka_*SNAPSHOT.tgz to be present in core/build/distributions, as well as the system test libs.
-   This can be done by running ./gradlew clean systemTestLibs releaseTarGz
+  - Test require that Kafka, including system test libs, is built. This can be done by running ./gradlew clean systemTestLibs
 * Run all tests
 ```
 bash tests/docker/run_tests.sh
@@ -93,7 +92,7 @@ This produces a json about the build which looks like:
     ],
     "before_install": null,
     "script": [
-      "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
+      "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
     ],
     "services": [
       "docker"
@@ -141,7 +140,7 @@ This produces a json about the build which looks like:
         "jdk": "oraclejdk8",
         "before_install": null,
         "script": [
-          "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
+          "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
         ],
         "services": [
           "docker"
@@ -178,7 +177,7 @@ This produces a json about the build which looks like:
         "jdk": "oraclejdk8",
         "before_install": null,
         "script": [
-          "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
+          "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
         ],
         "services": [
           "docker"
@@ -228,7 +227,7 @@ The resulting json looks like:
       "jdk": "oraclejdk8",
       "before_install": null,
       "script": [
-        "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
+        "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
       ],
       "services": [
         "docker"
@@ -265,7 +264,7 @@ The resulting json looks like:
       "jdk": "oraclejdk8",
       "before_install": null,
       "script": [
-        "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
+        "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
       ],
       "services": [
         "docker"

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/docker/ducker-ak
----------------------------------------------------------------------
diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak
index 381754b..f7eae49 100755
--- a/tests/docker/ducker-ak
+++ b/tests/docker/ducker-ak
@@ -413,8 +413,7 @@ ducker_test() {
         fi
     done
     must_pushd "${kafka_dir}"
-    ls ./core/build/distributions/kafka_*.tgz &> /dev/null
-    [[ $? -eq 0 ]] || die "Failed to find core/build/distributions/kafka_*.tgz.  Did you run ./gradlew releaseTarGz?"
+    (test -f ./gradlew || gradle) && ./gradlew systemTestLibs
     must_popd
     cmd="cd /opt/kafka-dev && ducktape --cluster-file /opt/kafka-dev/tests/docker/build/cluster.json $args"
     echo "docker exec -it ducker01 bash -c \"${cmd}\""

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/directory_layout/kafka_path.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/directory_layout/kafka_path.py b/tests/kafkatest/directory_layout/kafka_path.py
index ece8be5..40dda22 100644
--- a/tests/kafkatest/directory_layout/kafka_path.py
+++ b/tests/kafkatest/directory_layout/kafka_path.py
@@ -106,25 +106,25 @@ class KafkaSystemTestPathResolver(object):
         self.context = context
         self.project = project
 
-    def home(self, node_or_version=DEV_BRANCH):
+    def home(self, node_or_version=DEV_BRANCH, project=None):
         version = self._version(node_or_version)
-        home_dir = self.project
+        home_dir = project or self.project
         if version is not None:
             home_dir += "-%s" % str(version)
 
         return os.path.join(KAFKA_INSTALL_ROOT, home_dir)
 
-    def bin(self, node_or_version=DEV_BRANCH):
+    def bin(self, node_or_version=DEV_BRANCH, project=None):
         version = self._version(node_or_version)
-        return os.path.join(self.home(version), "bin")
+        return os.path.join(self.home(version, project=project), "bin")
 
-    def script(self, script_name, node_or_version=DEV_BRANCH):
+    def script(self, script_name, node_or_version=DEV_BRANCH, project=None):
         version = self._version(node_or_version)
-        return os.path.join(self.bin(version), script_name)
+        return os.path.join(self.bin(version, project=project), script_name)
 
-    def jar(self, jar_name, node_or_version=DEV_BRANCH):
+    def jar(self, jar_name, node_or_version=DEV_BRANCH, project=None):
         version = self._version(node_or_version)
-        return os.path.join(self.home(version), JARS[str(version)][jar_name])
+        return os.path.join(self.home(version, project=project), JARS[str(version)][jar_name])
 
     def scratch_space(self, service_instance):
         return os.path.join(SCRATCH_ROOT, service_instance.service_id)

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/services/monitor/http.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/http.py b/tests/kafkatest/services/monitor/http.py
new file mode 100644
index 0000000..83324df
--- /dev/null
+++ b/tests/kafkatest/services/monitor/http.py
@@ -0,0 +1,226 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from collections import defaultdict, namedtuple
+import json
+from threading import Thread
+from select import select
+import socket
+
+MetricKey = namedtuple('MetricKey', ['host', 'client_id', 'name', 'group', 'tags'])
+MetricValue = namedtuple('MetricValue', ['time', 'value'])
+
+# Python's logging library doesn't define anything more detailed than DEBUG, but we'd like a finer-grained setting for
+# for highly detailed messages, e.g. logging every single incoming request.
+TRACE = 5
+
+
+class HttpMetricsCollector(object):
+    """
+    HttpMetricsCollector enables collection of metrics from various Kafka clients instrumented with the
+    PushHttpMetricsReporter. It starts a web server locally and provides the necessary configuration for clients
+    to automatically report metrics data to this server. It also provides basic functionality for querying the
+    recorded metrics. This class can be used either as a mixin or standalone object.
+    """
+
+    # The port to listen on on the worker node, which will be forwarded to the port listening on this driver node
+    REMOTE_PORT = 6789
+
+    def __init__(self, **kwargs):
+        """
+        Create a new HttpMetricsCollector
+        :param period the period, in seconds, between updates that the metrics reporter configuration should define.
+               defaults to reporting once per second
+        :param args:
+        :param kwargs:
+        """
+        self._http_metrics_period = kwargs.pop('period', 1)
+
+        super(HttpMetricsCollector, self).__init__(**kwargs)
+
+        # TODO: currently we maintain just a simple map from all key info -> value. However, some key fields are far
+        # more common to filter on, so we'd want to index by them, e.g. host, client.id, metric name.
+        self._http_metrics = defaultdict(list)
+
+        self._httpd = HTTPServer(('', 0), _MetricsReceiver)
+        self._httpd.parent = self
+        self._httpd.metrics = self._http_metrics
+
+        self._http_metrics_thread = Thread(target=self._run_http_metrics_httpd,
+                                           name='http-metrics-thread[%s]' % str(self))
+        self._http_metrics_thread.start()
+
+        self._forwarders = {}
+
+    @property
+    def http_metrics_url(self):
+        """
+        :return: the URL to use when reporting metrics
+        """
+        return "http://%s:%d" % ("localhost", self.REMOTE_PORT)
+
+    @property
+    def http_metrics_client_configs(self):
+        """
+        Get client configurations that can be used to report data to this collector. Put these in a properties file for
+        clients (e.g. console producer or consumer) to have them push metrics to this driver. Note that in some cases
+        (e.g. streams, connect) these settings may need to be prefixed.
+        :return: a dictionary of client configurations that will direct a client to report metrics to this collector
+        """
+        return {
+            "metric.reporters": "org.apache.kafka.tools.PushHttpMetricsReporter",
+            "metrics.url": self.http_metrics_url,
+            "metrics.period": self._http_metrics_period,
+        }
+
+    def start_node(self, node):
+        local_port = self._httpd.socket.getsockname()[1]
+        self.logger.debug('HttpMetricsCollector listening on %s', local_port)
+        self._forwarders[self.idx(node)] = _ReverseForwarder(self.logger, node, self.REMOTE_PORT, local_port)
+
+        super(HttpMetricsCollector, self).start_node(node)
+
+    def stop(self):
+        super(HttpMetricsCollector, self).stop()
+
+        if self._http_metrics_thread:
+            self.logger.debug("Shutting down metrics httpd")
+            self._httpd.shutdown()
+            self._http_metrics_thread.join()
+            self.logger.debug("Finished shutting down metrics httpd")
+
+    def stop_node(self, node):
+        super(HttpMetricsCollector, self).stop_node(node)
+
+        idx = self.idx(node)
+        self._forwarders[idx].stop()
+        del self._forwarders[idx]
+
+    def metrics(self, host=None, client_id=None, name=None, group=None, tags=None):
+        """
+        Get any collected metrics that match the specified parameters, yielding each as a tuple of
+        (key, [<timestamp, value>, ...]) values.
+        """
+        for k, values in self._http_metrics.iteritems():
+            if ((host is None or host == k.host) and
+                    (client_id is None or client_id == k.client_id) and
+                    (name is None or name == k.name) and
+                    (group is None or group == k.group) and
+                    (tags is None or tags == k.tags)):
+                yield (k, values)
+
+    def _run_http_metrics_httpd(self):
+        self._httpd.serve_forever()
+
+
+class _MetricsReceiver(BaseHTTPRequestHandler):
+    """
+    HTTP request handler that accepts requests from the PushHttpMetricsReporter and stores them back into the parent
+    HttpMetricsCollector
+    """
+
+    def log_message(self, format, *args, **kwargs):
+        # Don't do any logging here so we get rid of the mostly useless per-request Apache log-style info that spams
+        # the debug log
+        pass
+
+    def do_POST(self):
+        data = self.rfile.read(int(self.headers['Content-Length']))
+        data = json.loads(data)
+        self.server.parent.logger.log(TRACE, "POST %s\n\n%s\n%s", self.path, self.headers,
+                                      json.dumps(data, indent=4, separators=(',', ': ')))
+        self.send_response(204)
+        self.end_headers()
+
+        client = data['client']
+        host = client['host']
+        client_id = client['client_id']
+        ts = client['time']
+        metrics = data['metrics']
+        for raw_metric in metrics:
+            name = raw_metric['name']
+            group = raw_metric['group']
+            # Convert to tuple of pairs because dicts & lists are unhashable
+            tags = tuple([(k, v) for k, v in raw_metric['tags'].iteritems()]),
+            value = raw_metric['value']
+
+            key = MetricKey(host=host, client_id=client_id, name=name, group=group, tags=tags)
+            metric_value = MetricValue(time=ts, value=value)
+
+            self.server.metrics[key].append(metric_value)
+
+
+class _ReverseForwarder(object):
+    """
+    Runs reverse forwarding of a port on a node to a local port. This allows you to setup a server on the test driver
+    that only assumes we have basic SSH access that ducktape guarantees is available for worker nodes.
+    """
+
+    def __init__(self, logger, node, remote_port, local_port):
+        self.logger = logger
+        self._node = node
+        self._local_port = local_port
+
+        self.logger.debug('Forwarding %s port %d to driver port %d', node, remote_port, local_port)
+
+        self._stopping = False
+
+        self._transport = node.account.ssh_client.get_transport()
+        self._transport.request_port_forward('', remote_port)
+
+        self._accept_thread = Thread(target=self._accept)
+        self._accept_thread.start()
+
+    def stop(self):
+        self._stopping = True
+        self._accept_thread.join(30)
+        if self._accept_thread.isAlive():
+            raise RuntimeError("Failed to stop reverse forwarder on %s", self._node)
+
+    def _accept(self):
+        while not self._stopping:
+            chan = self._transport.accept(1)
+            if chan is None:
+                continue
+            thr = Thread(target=self._handler, args=(chan,))
+            thr.setDaemon(True)
+            thr.start()
+
+    def _handler(self, chan):
+        sock = socket.socket()
+        try:
+            sock.connect(("localhost", self._local_port))
+        except Exception as e:
+            self.logger.error('Forwarding request to port %d failed: %r', self._local_port, e)
+            return
+
+        self.logger.log(TRACE, 'Connected! Tunnel open %r -> %r -> %d', chan.origin_addr, chan.getpeername(),
+                        self._local_port)
+        while True:
+            r, w, x = select([sock, chan], [], [])
+            if sock in r:
+                data = sock.recv(1024)
+                if len(data) == 0:
+                    break
+                chan.send(data)
+            if chan in r:
+                data = chan.recv(1024)
+                if len(data) == 0:
+                    break
+                sock.send(data)
+        chan.close()
+        sock.close()
+        self.logger.log(TRACE, 'Tunnel closed from %r', chan.origin_addr)

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/services/performance/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py
index ec2b63e..0d1f5b0 100644
--- a/tests/kafkatest/services/performance/performance.py
+++ b/tests/kafkatest/services/performance/performance.py
@@ -19,7 +19,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
 class PerformanceService(KafkaPathResolverMixin, BackgroundThreadService):
 
-    def __init__(self, context, num_nodes, root="/mnt/*", stop_timeout_sec=30):
+    def __init__(self, context=None, num_nodes=0, root="/mnt/*", stop_timeout_sec=30):
         super(PerformanceService, self).__init__(context, num_nodes)
         self.results = [None] * self.num_nodes
         self.stats = [[] for x in range(self.num_nodes)]

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 38bcc8c..18790a7 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -18,14 +18,14 @@ import time
 from ducktape.utils.util import wait_until
 from ducktape.cluster.remoteaccount import RemoteCommandError
 
-from kafkatest.directory_layout.kafka_path import  TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
-from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
+from kafkatest.services.monitor.http import HttpMetricsCollector
 from kafkatest.services.performance import PerformanceService
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.version import DEV_BRANCH, V_0_9_0_0
 
 
-class ProducerPerformanceService(JmxMixin, PerformanceService):
+class ProducerPerformanceService(HttpMetricsCollector, PerformanceService):
 
     PERSISTENT_ROOT = "/mnt/producer_performance"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "producer_performance.stdout")
@@ -35,11 +35,9 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
     LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
 
     def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=DEV_BRANCH, settings=None,
-                 intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=None):
+                 intermediate_stats=False, client_id="producer-performance"):
 
-        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [],
-                          root=ProducerPerformanceService.PERSISTENT_ROOT)
-        PerformanceService.__init__(self, context, num_nodes)
+        super(ProducerPerformanceService, self).__init__(context=context, num_nodes=num_nodes)
 
         self.logs = {
             "producer_performance_stdout": {
@@ -50,12 +48,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
                 "collect_default": True},
             "producer_performance_log": {
                 "path": ProducerPerformanceService.LOG_FILE,
-                "collect_default": True},
-            "jmx_log": {
-                "path": "/mnt/jmx_tool.log",
-                "collect_default": jmx_object_names is not None
-            }
-
+                "collect_default": True}
         }
 
         self.kafka = kafka
@@ -83,9 +76,9 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
         args = self.args.copy()
         args.update({
             'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
-            'jmx_port': self.jmx_port,
             'client_id': self.client_id,
-            'kafka_run_class': self.path.script("kafka-run-class.sh", node)
+            'kafka_run_class': self.path.script("kafka-run-class.sh", node),
+            'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.iteritems()])
             })
 
         cmd = ""
@@ -95,14 +88,15 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             # tool from the development branch
             tools_jar = self.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
             tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
+            tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
 
-            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
-            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
+            for jar in (tools_jar, tools_dependant_libs_jar):
+                cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % jar
             cmd += "export CLASSPATH; "
 
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % ProducerPerformanceService.LOG4J_CONFIG
-        cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\" %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \
-              "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
+        cmd += "KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\" %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \
+              "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s %(metrics_props)s" % args
 
         self.security_config.setup_node(node)
         if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
@@ -126,7 +120,6 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
         return len(self.pids(node)) > 0
 
     def _worker(self, idx, node):
-
         node.account.ssh("mkdir -p %s" % ProducerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
 
         # Create and upload log properties
@@ -145,13 +138,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
         if first_line is None:
             raise Exception("No output from ProducerPerformance")
 
-        self.start_jmx_tool(idx, node)
         wait_until(lambda: not self.alive(node), timeout_sec=1200, backoff_sec=2, err_msg="ProducerPerformance failed to finish")
         elapsed = time.time() - start
         self.logger.debug("ProducerPerformance process ran for %s seconds" % elapsed)
 
-        self.read_jmx_output(idx, node)
-
         # parse producer output from file
         last = None
         producer_output = node.account.ssh_capture("cat %s" % ProducerPerformanceService.STDOUT_CAPTURE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index ae86c28..47a6a96 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -136,8 +136,7 @@ class QuotaTest(Test):
         # Produce all messages
         producer = ProducerPerformanceService(
             self.test_context, producer_num, self.kafka,
-            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
-            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
+            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id)
 
         producer.run()
 
@@ -178,8 +177,9 @@ class QuotaTest(Test):
             msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
 
         # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
-        producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
-        producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
+        producer_maximum_bps = max(
+            metric.value for k, metrics in producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics
+        )
         producer_quota_bps = self.quota_config.producer_quota
         self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
         if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/tests/core/throttling_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
index 94a4010..586bac9 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -150,9 +150,7 @@ class ThrottlingTest(ProduceConsumeValidateTest):
         bulk_producer = ProducerPerformanceService(
             context=self.test_context, num_nodes=1, kafka=self.kafka,
             topic=self.topic, num_records=self.num_records,
-            record_size=self.record_size, throughput=-1, client_id=producer_id,
-            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id],
-            jmx_attributes=['outgoing-byte-rate'])
+            record_size=self.record_size, throughput=-1, client_id=producer_id)
 
 
         self.producer = VerifiableProducer(context=self.test_context,
@@ -173,3 +171,9 @@ class ThrottlingTest(ProduceConsumeValidateTest):
         bulk_producer.run()
         self.run_produce_consume_validate(core_test_action=
                                           lambda: self.reassign_partitions(bounce_brokers, self.throttle))
+
+        self.logger.debug("Bulk producer outgoing-byte-rates: %s",
+                          (metric.value for k, metrics in
+                          bulk_producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer_id) for
+                          metric in metrics)
+        )
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
new file mode 100644
index 0000000..d5839a4
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * MetricsReporter that aggregates metrics data and reports it via HTTP requests to a configurable
+ * webhook endpoint in JSON format.
+ *
+ * This is an internal class used for system tests and does not provide any compatibility guarantees.
+ */
+public class PushHttpMetricsReporter implements MetricsReporter {
+    private static final Logger log = LoggerFactory.getLogger(PushHttpMetricsReporter.class);
+
+    private static final String METRICS_PREFIX = "metrics.";
+    static final String METRICS_URL_CONFIG = METRICS_PREFIX + "url";
+    static final String METRICS_PERIOD_CONFIG = METRICS_PREFIX + "period";
+    static final String METRICS_HOST_CONFIG = METRICS_PREFIX + "host";
+    static final String CLIENT_ID_CONFIG = ProducerConfig.CLIENT_ID_CONFIG;
+
+    private static final Map<String, String> HEADERS = new LinkedHashMap<>();
+    static {
+        HEADERS.put("Content-Type", "application/json");
+    }
+
+    private final Object lock = new Object();
+    private final Time time;
+    private final ScheduledExecutorService executor;
+    // The set of metrics are updated in init/metricChange/metricRemoval
+    private final Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
+    private final ObjectMapper json = new ObjectMapper();
+
+    // Non-final because these are set via configure()
+    private URL url;
+    private String host;
+    private String clientId;
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(METRICS_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+                    "The URL to report metrics to")
+            .define(METRICS_PERIOD_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH,
+                    "The frequency at which metrics should be reported, in second")
+            .define(METRICS_HOST_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW,
+                    "The hostname to report with each metric; if null, defaults to the FQDN that can be automatically" +
+                            "determined")
+            .define(CLIENT_ID_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW,
+                    "Client ID to identify the application, generally inherited from the " +
+                            "producer/consumer/streams/connect instance");
+
+    public PushHttpMetricsReporter() {
+        time = Time.SYSTEM;
+        executor = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    PushHttpMetricsReporter(Time mockTime, ScheduledExecutorService mockExecutor) {
+        time = mockTime;
+        executor = mockExecutor;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs, true) { };
+        try {
+            url = new URL(config.getString(METRICS_URL_CONFIG));
+        } catch (MalformedURLException e) {
+            throw new ConfigException("Malformed metrics.url", e);
+        }
+        int period = config.getInt(METRICS_PERIOD_CONFIG);
+        clientId = config.getString(CLIENT_ID_CONFIG);
+
+        host = config.getString(METRICS_HOST_CONFIG);
+        if (host == null) {
+            try {
+                host = InetAddress.getLocalHost().getCanonicalHostName();
+            } catch (UnknownHostException e) {
+                throw new ConfigException("Failed to get canonical hostname", e);
+            }
+        }
+
+        executor.scheduleAtFixedRate(new HttpReporter(), period, period, TimeUnit.SECONDS);
+
+        log.info("Configured PushHttpMetricsReporter for {} to report every {} seconds", url, period);
+    }
+
+    @Override
+    public void init(List<KafkaMetric> initMetrics) {
+        synchronized (lock) {
+            for (KafkaMetric metric : initMetrics) {
+                log.debug("Adding metric {}", metric.metricName());
+                metrics.put(metric.metricName(), metric);
+            }
+        }
+    }
+
+    @Override
+    public void metricChange(KafkaMetric metric) {
+        synchronized (lock) {
+            log.debug("Updating metric {}", metric.metricName());
+            metrics.put(metric.metricName(), metric);
+        }
+    }
+
+    @Override
+    public void metricRemoval(KafkaMetric metric) {
+        synchronized (lock) {
+            log.debug("Removing metric {}", metric.metricName());
+            metrics.remove(metric.metricName());
+        }
+    }
+
+    @Override
+    public void close() {
+        executor.shutdown();
+        try {
+            executor.awaitTermination(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            throw new InterruptException("Interrupted when shutting down PushHttpMetricsReporter", e);
+        }
+    }
+
+    private class HttpReporter implements Runnable {
+        @Override
+        public void run() {
+            long now = time.milliseconds();
+            final List<MetricValue> samples;
+            synchronized (lock) {
+                samples = new ArrayList<>(metrics.size());
+                for (KafkaMetric metric : metrics.values()) {
+                    MetricName name = metric.metricName();
+                    double value = metric.value();
+                    samples.add(new MetricValue(name.name(), name.group(), name.tags(), value));
+                }
+            }
+
+            MetricsReport report = new MetricsReport(new MetricClientInfo(host, clientId, now), samples);
+
+            log.trace("Reporting {} metrics to {}", samples.size(), url);
+            HttpURLConnection connection = null;
+            try {
+                connection = newHttpConnection(url);
+                connection.setRequestMethod("POST");
+                // connection.getResponseCode() implicitly calls getInputStream, so always set to true.
+                // On the other hand, leaving this out breaks nothing.
+                connection.setDoInput(true);
+                connection.setRequestProperty("Content-Type", "application/json");
+                byte[] data = json.writeValueAsBytes(report);
+                connection.setRequestProperty("Content-Length", Integer.toString(data.length));
+                connection.setRequestProperty("Accept", "*/*");
+                connection.setUseCaches(false);
+
+                connection.setDoOutput(true);
+
+                try (OutputStream os = connection.getOutputStream()) {
+                    os.write(data);
+                    os.flush();
+                }
+
+                int responseCode = connection.getResponseCode();
+                if (responseCode >= 400) {
+                    InputStream is = connection.getErrorStream();
+                    String msg = readResponse(is);
+                    log.error("Error reporting metrics, {}: {}", responseCode, msg);
+                } else if (responseCode >= 300) {
+                    log.error("PushHttpMetricsReporter does not currently support redirects, saw {}", responseCode);
+                } else {
+                    log.info("Finished reporting metrics with response code {}", responseCode);
+                }
+            } catch (Exception e) {
+                log.error("Error reporting metrics", e);
+                throw new KafkaException("Failed to report current metrics", e);
+            } finally {
+                if (connection != null) {
+                    connection.disconnect();
+                }
+            }
+        }
+    }
+
+    // Static package-private so unit tests can use a mock connection
+    static HttpURLConnection newHttpConnection(URL url) throws IOException {
+        return (HttpURLConnection) url.openConnection();
+    }
+
+    // Static package-private so unit tests can mock reading response
+    static String readResponse(InputStream is) {
+        try (Scanner s = new Scanner(is, StandardCharsets.UTF_8.name()).useDelimiter("\\A")) {
+            return s.hasNext() ? s.next() : "";
+        }
+    }
+
+    private static class MetricsReport {
+        private final MetricClientInfo client;
+        private final Collection<MetricValue> metrics;
+
+        MetricsReport(MetricClientInfo client, Collection<MetricValue> metrics) {
+            this.client = client;
+            this.metrics = metrics;
+        }
+
+        @JsonProperty
+        public MetricClientInfo client() {
+            return client;
+        }
+
+        @JsonProperty
+        public Collection<MetricValue> metrics() {
+            return metrics;
+        }
+    }
+
+    private static class MetricClientInfo {
+        private final String host;
+        private final String clientId;
+        private final long time;
+
+        MetricClientInfo(String host, String clientId, long time) {
+            this.host = host;
+            this.clientId = clientId;
+            this.time = time;
+        }
+
+        @JsonProperty
+        public String host() {
+            return host;
+        }
+
+        @JsonProperty("client_id")
+        public String clientId() {
+            return clientId;
+        }
+
+        @JsonProperty
+        public long time() {
+            return time;
+        }
+    }
+
+    private static class MetricValue {
+
+        private final String name;
+        private final String group;
+        private final Map<String, String> tags;
+        private final Object value;
+
+        MetricValue(String name, String group, Map<String, String> tags, Object value) {
+            this.name = name;
+            this.group = group;
+            this.tags = tags;
+            this.value = value;
+        }
+
+        @JsonProperty
+        public String name() {
+            return name;
+        }
+
+        @JsonProperty
+        public String group() {
+            return group;
+        }
+
+        @JsonProperty
+        public Map<String, String> tags() {
+            return tags;
+        }
+
+        @JsonProperty
+        public Object value() {
+            return value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java b/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java
new file mode 100644
index 0000000..1cd3799
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PushHttpMetricsReporter.class)
+public class PushHttpMetricsReporterTest {
+
+    private static final URL URL;
+    static {
+        try {
+            URL = new URL("http://fake:80");
+        } catch (MalformedURLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    private PushHttpMetricsReporter reporter;
+    private Time time = new MockTime();
+    @MockStrict
+    private ScheduledExecutorService executor;
+    private Capture<Runnable> reportRunnable = EasyMock.newCapture();
+    @MockStrict
+    private HttpURLConnection httpReq;
+    @MockStrict
+    private OutputStream httpOut;
+    private Capture<byte[]> httpPayload = EasyMock.newCapture();
+    @MockStrict
+    private InputStream httpErr;
+
+    @Before
+    public void setUp() {
+        reporter = new PushHttpMetricsReporter(time, executor);
+        PowerMock.mockStatic(PushHttpMetricsReporter.class);
+    }
+
+    @Test
+    public void testConfigureClose() throws Exception {
+        expectConfigure();
+        expectClose();
+
+        replayAll();
+
+        configure();
+        reporter.close();
+
+        verifyAll();
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testConfigureBadUrl() throws Exception {
+        Map<String, String> config = new HashMap<>();
+        config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, "malformed;url");
+        config.put(PushHttpMetricsReporter.METRICS_PERIOD_CONFIG, "5");
+        reporter.configure(config);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testConfigureMissingPeriod() throws Exception {
+        Map<String, String> config = new HashMap<>();
+        config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, URL.toString());
+        reporter.configure(config);
+    }
+
+    @Test
+    public void testNoMetrics() throws Exception {
+        expectConfigure();
+        expectRequest(200);
+        expectClose();
+
+        replayAll();
+
+        configure();
+        reportRunnable.getValue().run();
+        JsonNode payload = new ObjectMapper().readTree(httpPayload.getValue());
+        assertTrue(payload.isObject());
+
+        assertPayloadHasClientInfo(payload);
+
+        // Should contain an empty list of metrics, i.e. we report updates even if there are no metrics to report to
+        // indicate liveness
+        JsonNode metrics = payload.get("metrics");
+        assertTrue(metrics.isArray());
+        assertEquals(0, metrics.size());
+
+        reporter.close();
+
+        verifyAll();
+    }
+
+    // For error conditions, we expect them to come with a response body that we can read & log
+    @Test
+    public void testClientError() throws Exception {
+        expectConfigure();
+        expectRequest(400, true);
+        expectClose();
+
+        replayAll();
+
+        configure();
+        reportRunnable.getValue().run();
+
+        reporter.close();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testServerError() throws Exception {
+        expectConfigure();
+        expectRequest(500, true);
+        expectClose();
+
+        replayAll();
+
+        configure();
+        reportRunnable.getValue().run();
+
+        reporter.close();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testMetricValues() throws Exception {
+        expectConfigure();
+        expectRequest(200);
+        expectClose();
+
+        replayAll();
+
+        configure();
+        KafkaMetric metric1 = new KafkaMetric(
+                new Object(),
+                new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
+                new ImmutableValue(1.0),
+                null,
+                time
+        );
+        KafkaMetric newMetric1 = new KafkaMetric(
+                new Object(),
+                new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
+                new ImmutableValue(-1.0),
+                null,
+                time
+        );
+        KafkaMetric metric2 = new KafkaMetric(
+                new Object(),
+                new MetricName("name2", "group2", "desc2", Collections.singletonMap("key2", "value2")),
+                new ImmutableValue(2.0),
+                null,
+                time
+        );
+        KafkaMetric metric3 = new KafkaMetric(
+                new Object(),
+                new MetricName("name3", "group3", "desc3", Collections.singletonMap("key3", "value3")),
+                new ImmutableValue(3.0),
+                null,
+                time
+        );
+        reporter.init(Arrays.asList(metric1, metric2));
+        reporter.metricChange(newMetric1); // added in init, modified
+        reporter.metricChange(metric3); // added by change
+        reporter.metricRemoval(metric2); // added in init, deleted by removal
+
+        reportRunnable.getValue().run();
+        JsonNode payload = new ObjectMapper().readTree(httpPayload.getValue());
+        assertTrue(payload.isObject());
+        assertPayloadHasClientInfo(payload);
+
+        // We should be left with the modified version of metric1 and metric3
+        JsonNode metrics = payload.get("metrics");
+        assertTrue(metrics.isArray());
+        assertEquals(2, metrics.size());
+
+        JsonNode m1 = metrics.get(0);
+        assertEquals("name1", m1.get("name").textValue());
+        assertEquals("group1", m1.get("group").textValue());
+        JsonNode m1Tags = m1.get("tags");
+        assertTrue(m1Tags.isObject());
+        assertEquals(1, m1Tags.size());
+        assertEquals("value1", m1Tags.get("key1").textValue());
+        assertEquals(-1.0, m1.get("value").doubleValue(), 0.0);
+
+        JsonNode m3 = metrics.get(1);
+        assertEquals("name3", m3.get("name").textValue());
+        assertEquals("group3", m3.get("group").textValue());
+        JsonNode m3Tags = m3.get("tags");
+        assertTrue(m3Tags.isObject());
+        assertEquals(1, m3Tags.size());
+        assertEquals("value3", m3Tags.get("key3").textValue());
+        assertEquals(3.0, m3.get("value").doubleValue(), 0.0);
+
+        reporter.close();
+
+        verifyAll();
+    }
+
+    private void expectConfigure() {
+        EasyMock.expect(
+                executor.scheduleAtFixedRate(EasyMock.capture(reportRunnable), EasyMock.eq(5L), EasyMock.eq(5L), EasyMock.eq(TimeUnit.SECONDS))
+        ).andReturn(null); // return value not expected to be used
+    }
+
+    private void configure() {
+        Map<String, String> config = new HashMap<>();
+        config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, URL.toString());
+        config.put(PushHttpMetricsReporter.METRICS_PERIOD_CONFIG, "5");
+        reporter.configure(config);
+    }
+
+    private void expectRequest(int returnStatus) throws Exception {
+        expectRequest(returnStatus, false);
+    }
+
+    // Expect that a request is made with the given response code
+    private void expectRequest(int returnStatus, boolean readResponse) throws Exception {
+        EasyMock.expect(PushHttpMetricsReporter.newHttpConnection(URL)).andReturn(httpReq);
+        httpReq.setRequestMethod("POST");
+        EasyMock.expectLastCall();
+        httpReq.setDoInput(true);
+        EasyMock.expectLastCall();
+        httpReq.setRequestProperty("Content-Type", "application/json");
+        EasyMock.expectLastCall();
+        httpReq.setRequestProperty(EasyMock.eq("Content-Length"), EasyMock.anyString());
+        EasyMock.expectLastCall();
+        httpReq.setRequestProperty("Accept", "*/*");
+        EasyMock.expectLastCall();
+        httpReq.setUseCaches(false);
+        EasyMock.expectLastCall();
+        httpReq.setDoOutput(true);
+        EasyMock.expectLastCall();
+        EasyMock.expect(httpReq.getOutputStream()).andReturn(httpOut);
+        httpOut.write(EasyMock.capture(httpPayload));
+        EasyMock.expectLastCall();
+        httpOut.flush();
+        EasyMock.expectLastCall();
+        httpOut.close();
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(httpReq.getResponseCode()).andReturn(returnStatus);
+
+        if (readResponse)
+            expectReadResponse();
+
+        httpReq.disconnect();
+        EasyMock.expectLastCall();
+    }
+
+    private void assertPayloadHasClientInfo(JsonNode payload) throws UnknownHostException {
+        // Should contain client info...
+        JsonNode client = payload.get("client");
+        assertTrue(client.isObject());
+        assertEquals(InetAddress.getLocalHost().getCanonicalHostName(), client.get("host").textValue());
+        assertEquals("", client.get("client_id").textValue());
+        assertEquals(time.milliseconds(), client.get("time").longValue());
+    }
+
+    private void expectReadResponse() throws Exception {
+        EasyMock.expect(httpReq.getErrorStream()).andReturn(httpErr);
+        EasyMock.expect(PushHttpMetricsReporter.readResponse(httpErr)).andReturn("error response message");
+        EasyMock.expectLastCall();
+    }
+
+    private void expectClose() throws Exception {
+        executor.shutdown();
+        EasyMock.expect(executor.awaitTermination(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))).andReturn(true);
+    }
+
+    private static class ImmutableValue implements Measurable {
+        private final double value;
+
+        public ImmutableValue(double value) {
+            this.value = value;
+        }
+
+        @Override
+        public double measure(MetricConfig config, long now) {
+            return value;
+        }
+    }
+}