You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/11/09 17:43:02 UTC
kafka git commit: MINOR: Add HttpMetricsReporter for system tests
Repository: kafka
Updated Branches:
refs/heads/trunk 0653a895f -> 718dda114
MINOR: Add HttpMetricsReporter for system tests
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #4072 from ewencp/http-metrics
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/718dda11
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/718dda11
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/718dda11
Branch: refs/heads/trunk
Commit: 718dda1144629d824f4bdb8ff73fbd531a22723a
Parents: 0653a89
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Nov 9 09:42:46 2017 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Nov 9 09:42:46 2017 -0800
----------------------------------------------------------------------
.travis.yml | 2 +-
build.gradle | 3 +
.../kafka/common/metrics/KafkaMetric.java | 3 +-
tests/README.md | 13 +-
tests/docker/ducker-ak | 3 +-
tests/kafkatest/directory_layout/kafka_path.py | 16 +-
tests/kafkatest/services/monitor/http.py | 226 +++++++++++++
.../services/performance/performance.py | 2 +-
.../performance/producer_performance.py | 36 +-
tests/kafkatest/tests/client/quota_test.py | 8 +-
tests/kafkatest/tests/core/throttling_test.py | 10 +-
.../kafka/tools/PushHttpMetricsReporter.java | 319 ++++++++++++++++++
.../tools/PushHttpMetricsReporterTest.java | 333 +++++++++++++++++++
13 files changed, 924 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 9be5c58..8a22c9b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -39,7 +39,7 @@ before_install:
script:
- ./gradlew rat
- - ./gradlew releaseTarGz && /bin/bash ./tests/docker/run_tests.sh
+ - ./gradlew systemTestLibs && /bin/bash ./tests/docker/run_tests.sh
services:
- docker
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 9a221d9..abd73bb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -861,6 +861,9 @@ project(':tools') {
testCompile project(':clients')
testCompile libs.junit
testCompile project(':clients').sourceSets.test.output
+ testCompile libs.easymock
+ testCompile libs.powermockJunit4
+ testCompile libs.powermockEasymock
}
javadoc {
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index 37c2b1b..f04981a 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -28,7 +28,8 @@ public final class KafkaMetric implements Metric {
private final MetricValueProvider<?> metricValueProvider;
private MetricConfig config;
- KafkaMetric(Object lock, MetricName metricName, MetricValueProvider<?> valueProvider,
+ // public for testing
+ public KafkaMetric(Object lock, MetricName metricName, MetricValueProvider<?> valueProvider,
MetricConfig config, Time time) {
this.metricName = metricName;
this.lock = lock;
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/README.md
----------------------------------------------------------------------
diff --git a/tests/README.md b/tests/README.md
index cecb990..f0ffdf5 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -11,8 +11,7 @@ Running tests using docker
Docker containers can be used for running kafka system tests locally.
* Requirements
- Docker 1.12.3 (or higher) is installed and running on the machine.
- - Test require a single kafka_*SNAPSHOT.tgz to be present in core/build/distributions, as well as the system test libs.
- This can be done by running ./gradlew clean systemTestLibs releaseTarGz
+ - Test require that Kafka, including system test libs, is built. This can be done by running ./gradlew clean systemTestLibs
* Run all tests
```
bash tests/docker/run_tests.sh
@@ -93,7 +92,7 @@ This produces a json about the build which looks like:
],
"before_install": null,
"script": [
- "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
+ "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
],
"services": [
"docker"
@@ -141,7 +140,7 @@ This produces a json about the build which looks like:
"jdk": "oraclejdk8",
"before_install": null,
"script": [
- "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
+ "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
],
"services": [
"docker"
@@ -178,7 +177,7 @@ This produces a json about the build which looks like:
"jdk": "oraclejdk8",
"before_install": null,
"script": [
- "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
+ "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
],
"services": [
"docker"
@@ -228,7 +227,7 @@ The resulting json looks like:
"jdk": "oraclejdk8",
"before_install": null,
"script": [
- "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
+ "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
],
"services": [
"docker"
@@ -265,7 +264,7 @@ The resulting json looks like:
"jdk": "oraclejdk8",
"before_install": null,
"script": [
- "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
+ "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
],
"services": [
"docker"
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/docker/ducker-ak
----------------------------------------------------------------------
diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak
index 381754b..f7eae49 100755
--- a/tests/docker/ducker-ak
+++ b/tests/docker/ducker-ak
@@ -413,8 +413,7 @@ ducker_test() {
fi
done
must_pushd "${kafka_dir}"
- ls ./core/build/distributions/kafka_*.tgz &> /dev/null
- [[ $? -eq 0 ]] || die "Failed to find core/build/distributions/kafka_*.tgz. Did you run ./gradlew releaseTarGz?"
+ (test -f ./gradlew || gradle) && ./gradlew systemTestLibs
must_popd
cmd="cd /opt/kafka-dev && ducktape --cluster-file /opt/kafka-dev/tests/docker/build/cluster.json $args"
echo "docker exec -it ducker01 bash -c \"${cmd}\""
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/directory_layout/kafka_path.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/directory_layout/kafka_path.py b/tests/kafkatest/directory_layout/kafka_path.py
index ece8be5..40dda22 100644
--- a/tests/kafkatest/directory_layout/kafka_path.py
+++ b/tests/kafkatest/directory_layout/kafka_path.py
@@ -106,25 +106,25 @@ class KafkaSystemTestPathResolver(object):
self.context = context
self.project = project
- def home(self, node_or_version=DEV_BRANCH):
+ def home(self, node_or_version=DEV_BRANCH, project=None):
version = self._version(node_or_version)
- home_dir = self.project
+ home_dir = project or self.project
if version is not None:
home_dir += "-%s" % str(version)
return os.path.join(KAFKA_INSTALL_ROOT, home_dir)
- def bin(self, node_or_version=DEV_BRANCH):
+ def bin(self, node_or_version=DEV_BRANCH, project=None):
version = self._version(node_or_version)
- return os.path.join(self.home(version), "bin")
+ return os.path.join(self.home(version, project=project), "bin")
- def script(self, script_name, node_or_version=DEV_BRANCH):
+ def script(self, script_name, node_or_version=DEV_BRANCH, project=None):
version = self._version(node_or_version)
- return os.path.join(self.bin(version), script_name)
+ return os.path.join(self.bin(version, project=project), script_name)
- def jar(self, jar_name, node_or_version=DEV_BRANCH):
+ def jar(self, jar_name, node_or_version=DEV_BRANCH, project=None):
version = self._version(node_or_version)
- return os.path.join(self.home(version), JARS[str(version)][jar_name])
+ return os.path.join(self.home(version, project=project), JARS[str(version)][jar_name])
def scratch_space(self, service_instance):
return os.path.join(SCRATCH_ROOT, service_instance.service_id)
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/services/monitor/http.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/http.py b/tests/kafkatest/services/monitor/http.py
new file mode 100644
index 0000000..83324df
--- /dev/null
+++ b/tests/kafkatest/services/monitor/http.py
@@ -0,0 +1,226 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from collections import defaultdict, namedtuple
+import json
+from threading import Thread
+from select import select
+import socket
+
+MetricKey = namedtuple('MetricKey', ['host', 'client_id', 'name', 'group', 'tags'])
+MetricValue = namedtuple('MetricValue', ['time', 'value'])
+
+# Python's logging library doesn't define anything more detailed than DEBUG, but we'd like a finer-grained setting for
+# for highly detailed messages, e.g. logging every single incoming request.
+TRACE = 5
+
+
+class HttpMetricsCollector(object):
+ """
+ HttpMetricsCollector enables collection of metrics from various Kafka clients instrumented with the
+ PushHttpMetricsReporter. It starts a web server locally and provides the necessary configuration for clients
+ to automatically report metrics data to this server. It also provides basic functionality for querying the
+ recorded metrics. This class can be used either as a mixin or standalone object.
+ """
+
+ # The port to listen on on the worker node, which will be forwarded to the port listening on this driver node
+ REMOTE_PORT = 6789
+
+ def __init__(self, **kwargs):
+ """
+ Create a new HttpMetricsCollector
+ :param period the period, in seconds, between updates that the metrics reporter configuration should define.
+ defaults to reporting once per second
+ :param args:
+ :param kwargs:
+ """
+ self._http_metrics_period = kwargs.pop('period', 1)
+
+ super(HttpMetricsCollector, self).__init__(**kwargs)
+
+ # TODO: currently we maintain just a simple map from all key info -> value. However, some key fields are far
+ # more common to filter on, so we'd want to index by them, e.g. host, client.id, metric name.
+ self._http_metrics = defaultdict(list)
+
+ self._httpd = HTTPServer(('', 0), _MetricsReceiver)
+ self._httpd.parent = self
+ self._httpd.metrics = self._http_metrics
+
+ self._http_metrics_thread = Thread(target=self._run_http_metrics_httpd,
+ name='http-metrics-thread[%s]' % str(self))
+ self._http_metrics_thread.start()
+
+ self._forwarders = {}
+
+ @property
+ def http_metrics_url(self):
+ """
+ :return: the URL to use when reporting metrics
+ """
+ return "http://%s:%d" % ("localhost", self.REMOTE_PORT)
+
+ @property
+ def http_metrics_client_configs(self):
+ """
+ Get client configurations that can be used to report data to this collector. Put these in a properties file for
+ clients (e.g. console producer or consumer) to have them push metrics to this driver. Note that in some cases
+ (e.g. streams, connect) these settings may need to be prefixed.
+ :return: a dictionary of client configurations that will direct a client to report metrics to this collector
+ """
+ return {
+ "metric.reporters": "org.apache.kafka.tools.PushHttpMetricsReporter",
+ "metrics.url": self.http_metrics_url,
+ "metrics.period": self._http_metrics_period,
+ }
+
+ def start_node(self, node):
+ local_port = self._httpd.socket.getsockname()[1]
+ self.logger.debug('HttpMetricsCollector listening on %s', local_port)
+ self._forwarders[self.idx(node)] = _ReverseForwarder(self.logger, node, self.REMOTE_PORT, local_port)
+
+ super(HttpMetricsCollector, self).start_node(node)
+
+ def stop(self):
+ super(HttpMetricsCollector, self).stop()
+
+ if self._http_metrics_thread:
+ self.logger.debug("Shutting down metrics httpd")
+ self._httpd.shutdown()
+ self._http_metrics_thread.join()
+ self.logger.debug("Finished shutting down metrics httpd")
+
+ def stop_node(self, node):
+ super(HttpMetricsCollector, self).stop_node(node)
+
+ idx = self.idx(node)
+ self._forwarders[idx].stop()
+ del self._forwarders[idx]
+
+ def metrics(self, host=None, client_id=None, name=None, group=None, tags=None):
+ """
+ Get any collected metrics that match the specified parameters, yielding each as a tuple of
+ (key, [<timestamp, value>, ...]) values.
+ """
+ for k, values in self._http_metrics.iteritems():
+ if ((host is None or host == k.host) and
+ (client_id is None or client_id == k.client_id) and
+ (name is None or name == k.name) and
+ (group is None or group == k.group) and
+ (tags is None or tags == k.tags)):
+ yield (k, values)
+
+ def _run_http_metrics_httpd(self):
+ self._httpd.serve_forever()
+
+
+class _MetricsReceiver(BaseHTTPRequestHandler):
+ """
+ HTTP request handler that accepts requests from the PushHttpMetricsReporter and stores them back into the parent
+ HttpMetricsCollector
+ """
+
+ def log_message(self, format, *args, **kwargs):
+ # Don't do any logging here so we get rid of the mostly useless per-request Apache log-style info that spams
+ # the debug log
+ pass
+
+ def do_POST(self):
+ data = self.rfile.read(int(self.headers['Content-Length']))
+ data = json.loads(data)
+ self.server.parent.logger.log(TRACE, "POST %s\n\n%s\n%s", self.path, self.headers,
+ json.dumps(data, indent=4, separators=(',', ': ')))
+ self.send_response(204)
+ self.end_headers()
+
+ client = data['client']
+ host = client['host']
+ client_id = client['client_id']
+ ts = client['time']
+ metrics = data['metrics']
+ for raw_metric in metrics:
+ name = raw_metric['name']
+ group = raw_metric['group']
+ # Convert to tuple of pairs because dicts & lists are unhashable
+ tags = tuple([(k, v) for k, v in raw_metric['tags'].iteritems()]),
+ value = raw_metric['value']
+
+ key = MetricKey(host=host, client_id=client_id, name=name, group=group, tags=tags)
+ metric_value = MetricValue(time=ts, value=value)
+
+ self.server.metrics[key].append(metric_value)
+
+
+class _ReverseForwarder(object):
+ """
+ Runs reverse forwarding of a port on a node to a local port. This allows you to setup a server on the test driver
+ that only assumes we have basic SSH access that ducktape guarantees is available for worker nodes.
+ """
+
+ def __init__(self, logger, node, remote_port, local_port):
+ self.logger = logger
+ self._node = node
+ self._local_port = local_port
+
+ self.logger.debug('Forwarding %s port %d to driver port %d', node, remote_port, local_port)
+
+ self._stopping = False
+
+ self._transport = node.account.ssh_client.get_transport()
+ self._transport.request_port_forward('', remote_port)
+
+ self._accept_thread = Thread(target=self._accept)
+ self._accept_thread.start()
+
+ def stop(self):
+ self._stopping = True
+ self._accept_thread.join(30)
+ if self._accept_thread.isAlive():
+ raise RuntimeError("Failed to stop reverse forwarder on %s", self._node)
+
+ def _accept(self):
+ while not self._stopping:
+ chan = self._transport.accept(1)
+ if chan is None:
+ continue
+ thr = Thread(target=self._handler, args=(chan,))
+ thr.setDaemon(True)
+ thr.start()
+
+ def _handler(self, chan):
+ sock = socket.socket()
+ try:
+ sock.connect(("localhost", self._local_port))
+ except Exception as e:
+ self.logger.error('Forwarding request to port %d failed: %r', self._local_port, e)
+ return
+
+ self.logger.log(TRACE, 'Connected! Tunnel open %r -> %r -> %d', chan.origin_addr, chan.getpeername(),
+ self._local_port)
+ while True:
+ r, w, x = select([sock, chan], [], [])
+ if sock in r:
+ data = sock.recv(1024)
+ if len(data) == 0:
+ break
+ chan.send(data)
+ if chan in r:
+ data = chan.recv(1024)
+ if len(data) == 0:
+ break
+ sock.send(data)
+ chan.close()
+ sock.close()
+ self.logger.log(TRACE, 'Tunnel closed from %r', chan.origin_addr)
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/services/performance/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py
index ec2b63e..0d1f5b0 100644
--- a/tests/kafkatest/services/performance/performance.py
+++ b/tests/kafkatest/services/performance/performance.py
@@ -19,7 +19,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
class PerformanceService(KafkaPathResolverMixin, BackgroundThreadService):
- def __init__(self, context, num_nodes, root="/mnt/*", stop_timeout_sec=30):
+ def __init__(self, context=None, num_nodes=0, root="/mnt/*", stop_timeout_sec=30):
super(PerformanceService, self).__init__(context, num_nodes)
self.results = [None] * self.num_nodes
self.stats = [[] for x in range(self.num_nodes)]
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 38bcc8c..18790a7 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -18,14 +18,14 @@ import time
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError
-from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
-from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
+from kafkatest.services.monitor.http import HttpMetricsCollector
from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH, V_0_9_0_0
-class ProducerPerformanceService(JmxMixin, PerformanceService):
+class ProducerPerformanceService(HttpMetricsCollector, PerformanceService):
PERSISTENT_ROOT = "/mnt/producer_performance"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "producer_performance.stdout")
@@ -35,11 +35,9 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=DEV_BRANCH, settings=None,
- intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=None):
+ intermediate_stats=False, client_id="producer-performance"):
- JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [],
- root=ProducerPerformanceService.PERSISTENT_ROOT)
- PerformanceService.__init__(self, context, num_nodes)
+ super(ProducerPerformanceService, self).__init__(context=context, num_nodes=num_nodes)
self.logs = {
"producer_performance_stdout": {
@@ -50,12 +48,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
"collect_default": True},
"producer_performance_log": {
"path": ProducerPerformanceService.LOG_FILE,
- "collect_default": True},
- "jmx_log": {
- "path": "/mnt/jmx_tool.log",
- "collect_default": jmx_object_names is not None
- }
-
+ "collect_default": True}
}
self.kafka = kafka
@@ -83,9 +76,9 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
args = self.args.copy()
args.update({
'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
- 'jmx_port': self.jmx_port,
'client_id': self.client_id,
- 'kafka_run_class': self.path.script("kafka-run-class.sh", node)
+ 'kafka_run_class': self.path.script("kafka-run-class.sh", node),
+ 'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.iteritems()])
})
cmd = ""
@@ -95,14 +88,15 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
# tool from the development branch
tools_jar = self.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
+ tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
- cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
- cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
+ for jar in (tools_jar, tools_dependant_libs_jar):
+ cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % jar
cmd += "export CLASSPATH; "
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % ProducerPerformanceService.LOG4J_CONFIG
- cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\" %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \
- "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
+ cmd += "KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\" %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \
+ "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s %(metrics_props)s" % args
self.security_config.setup_node(node)
if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
@@ -126,7 +120,6 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
return len(self.pids(node)) > 0
def _worker(self, idx, node):
-
node.account.ssh("mkdir -p %s" % ProducerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
# Create and upload log properties
@@ -145,13 +138,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
if first_line is None:
raise Exception("No output from ProducerPerformance")
- self.start_jmx_tool(idx, node)
wait_until(lambda: not self.alive(node), timeout_sec=1200, backoff_sec=2, err_msg="ProducerPerformance failed to finish")
elapsed = time.time() - start
self.logger.debug("ProducerPerformance process ran for %s seconds" % elapsed)
- self.read_jmx_output(idx, node)
-
# parse producer output from file
last = None
producer_output = node.account.ssh_capture("cat %s" % ProducerPerformanceService.STDOUT_CAPTURE)
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index ae86c28..47a6a96 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -136,8 +136,7 @@ class QuotaTest(Test):
# Produce all messages
producer = ProducerPerformanceService(
self.test_context, producer_num, self.kafka,
- topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
- jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
+ topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id)
producer.run()
@@ -178,8 +177,9 @@ class QuotaTest(Test):
msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
# validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
- producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
- producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
+ producer_maximum_bps = max(
+ metric.value for k, metrics in producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics
+ )
producer_quota_bps = self.quota_config.producer_quota
self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tests/kafkatest/tests/core/throttling_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
index 94a4010..586bac9 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -150,9 +150,7 @@ class ThrottlingTest(ProduceConsumeValidateTest):
bulk_producer = ProducerPerformanceService(
context=self.test_context, num_nodes=1, kafka=self.kafka,
topic=self.topic, num_records=self.num_records,
- record_size=self.record_size, throughput=-1, client_id=producer_id,
- jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id],
- jmx_attributes=['outgoing-byte-rate'])
+ record_size=self.record_size, throughput=-1, client_id=producer_id)
self.producer = VerifiableProducer(context=self.test_context,
@@ -173,3 +171,9 @@ class ThrottlingTest(ProduceConsumeValidateTest):
bulk_producer.run()
self.run_produce_consume_validate(core_test_action=
lambda: self.reassign_partitions(bounce_brokers, self.throttle))
+
+ self.logger.debug("Bulk producer outgoing-byte-rates: %s",
+ (metric.value for k, metrics in
+ bulk_producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer_id) for
+ metric in metrics)
+ )
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
new file mode 100644
index 0000000..d5839a4
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * MetricsReporter that aggregates metrics data and reports it via HTTP requests to a configurable
+ * webhook endpoint in JSON format.
+ *
+ * This is an internal class used for system tests and does not provide any compatibility guarantees.
+ */
+public class PushHttpMetricsReporter implements MetricsReporter {
+ private static final Logger log = LoggerFactory.getLogger(PushHttpMetricsReporter.class);
+
+ private static final String METRICS_PREFIX = "metrics.";
+ static final String METRICS_URL_CONFIG = METRICS_PREFIX + "url";
+ static final String METRICS_PERIOD_CONFIG = METRICS_PREFIX + "period";
+ static final String METRICS_HOST_CONFIG = METRICS_PREFIX + "host";
+ static final String CLIENT_ID_CONFIG = ProducerConfig.CLIENT_ID_CONFIG;
+
+ private static final Map<String, String> HEADERS = new LinkedHashMap<>();
+ static {
+ HEADERS.put("Content-Type", "application/json");
+ }
+
+ private final Object lock = new Object();
+ private final Time time;
+ private final ScheduledExecutorService executor;
+ // The set of metrics are updated in init/metricChange/metricRemoval
+ private final Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
+ private final ObjectMapper json = new ObjectMapper();
+
+ // Non-final because these are set via configure()
+ private URL url;
+ private String host;
+ private String clientId;
+
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(METRICS_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+ "The URL to report metrics to")
+ .define(METRICS_PERIOD_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH,
+ "The frequency at which metrics should be reported, in second")
+ .define(METRICS_HOST_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW,
+ "The hostname to report with each metric; if null, defaults to the FQDN that can be automatically" +
+ "determined")
+ .define(CLIENT_ID_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW,
+ "Client ID to identify the application, generally inherited from the " +
+ "producer/consumer/streams/connect instance");
+
+ public PushHttpMetricsReporter() {
+ time = Time.SYSTEM;
+ executor = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ PushHttpMetricsReporter(Time mockTime, ScheduledExecutorService mockExecutor) {
+ time = mockTime;
+ executor = mockExecutor;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs, true) { };
+ try {
+ url = new URL(config.getString(METRICS_URL_CONFIG));
+ } catch (MalformedURLException e) {
+ throw new ConfigException("Malformed metrics.url", e);
+ }
+ int period = config.getInt(METRICS_PERIOD_CONFIG);
+ clientId = config.getString(CLIENT_ID_CONFIG);
+
+ host = config.getString(METRICS_HOST_CONFIG);
+ if (host == null) {
+ try {
+ host = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ throw new ConfigException("Failed to get canonical hostname", e);
+ }
+ }
+
+ executor.scheduleAtFixedRate(new HttpReporter(), period, period, TimeUnit.SECONDS);
+
+ log.info("Configured PushHttpMetricsReporter for {} to report every {} seconds", url, period);
+ }
+
+ @Override
+ public void init(List<KafkaMetric> initMetrics) {
+ synchronized (lock) {
+ for (KafkaMetric metric : initMetrics) {
+ log.debug("Adding metric {}", metric.metricName());
+ metrics.put(metric.metricName(), metric);
+ }
+ }
+ }
+
+ @Override
+ public void metricChange(KafkaMetric metric) {
+ synchronized (lock) {
+ log.debug("Updating metric {}", metric.metricName());
+ metrics.put(metric.metricName(), metric);
+ }
+ }
+
+ @Override
+ public void metricRemoval(KafkaMetric metric) {
+ synchronized (lock) {
+ log.debug("Removing metric {}", metric.metricName());
+ metrics.remove(metric.metricName());
+ }
+ }
+
+ @Override
+ public void close() {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new InterruptException("Interrupted when shutting down PushHttpMetricsReporter", e);
+ }
+ }
+
+ private class HttpReporter implements Runnable {
+ @Override
+ public void run() {
+ long now = time.milliseconds();
+ final List<MetricValue> samples;
+ synchronized (lock) {
+ samples = new ArrayList<>(metrics.size());
+ for (KafkaMetric metric : metrics.values()) {
+ MetricName name = metric.metricName();
+ double value = metric.value();
+ samples.add(new MetricValue(name.name(), name.group(), name.tags(), value));
+ }
+ }
+
+ MetricsReport report = new MetricsReport(new MetricClientInfo(host, clientId, now), samples);
+
+ log.trace("Reporting {} metrics to {}", samples.size(), url);
+ HttpURLConnection connection = null;
+ try {
+ connection = newHttpConnection(url);
+ connection.setRequestMethod("POST");
+ // connection.getResponseCode() implicitly calls getInputStream, so always set to true.
+ // On the other hand, leaving this out breaks nothing.
+ connection.setDoInput(true);
+ connection.setRequestProperty("Content-Type", "application/json");
+ byte[] data = json.writeValueAsBytes(report);
+ connection.setRequestProperty("Content-Length", Integer.toString(data.length));
+ connection.setRequestProperty("Accept", "*/*");
+ connection.setUseCaches(false);
+
+ connection.setDoOutput(true);
+
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(data);
+ os.flush();
+ }
+
+ int responseCode = connection.getResponseCode();
+ if (responseCode >= 400) {
+ InputStream is = connection.getErrorStream();
+ String msg = readResponse(is);
+ log.error("Error reporting metrics, {}: {}", responseCode, msg);
+ } else if (responseCode >= 300) {
+ log.error("PushHttpMetricsReporter does not currently support redirects, saw {}", responseCode);
+ } else {
+ log.info("Finished reporting metrics with response code {}", responseCode);
+ }
+ } catch (Exception e) {
+ log.error("Error reporting metrics", e);
+ throw new KafkaException("Failed to report current metrics", e);
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+ }
+
+ // Static package-private so unit tests can use a mock connection
+ static HttpURLConnection newHttpConnection(URL url) throws IOException {
+ return (HttpURLConnection) url.openConnection();
+ }
+
+ // Static package-private so unit tests can mock reading response
+ static String readResponse(InputStream is) {
+ try (Scanner s = new Scanner(is, StandardCharsets.UTF_8.name()).useDelimiter("\\A")) {
+ return s.hasNext() ? s.next() : "";
+ }
+ }
+
+ private static class MetricsReport {
+ private final MetricClientInfo client;
+ private final Collection<MetricValue> metrics;
+
+ MetricsReport(MetricClientInfo client, Collection<MetricValue> metrics) {
+ this.client = client;
+ this.metrics = metrics;
+ }
+
+ @JsonProperty
+ public MetricClientInfo client() {
+ return client;
+ }
+
+ @JsonProperty
+ public Collection<MetricValue> metrics() {
+ return metrics;
+ }
+ }
+
+ private static class MetricClientInfo {
+ private final String host;
+ private final String clientId;
+ private final long time;
+
+ MetricClientInfo(String host, String clientId, long time) {
+ this.host = host;
+ this.clientId = clientId;
+ this.time = time;
+ }
+
+ @JsonProperty
+ public String host() {
+ return host;
+ }
+
+ @JsonProperty("client_id")
+ public String clientId() {
+ return clientId;
+ }
+
+ @JsonProperty
+ public long time() {
+ return time;
+ }
+ }
+
+ private static class MetricValue {
+
+ private final String name;
+ private final String group;
+ private final Map<String, String> tags;
+ private final Object value;
+
+ MetricValue(String name, String group, Map<String, String> tags, Object value) {
+ this.name = name;
+ this.group = group;
+ this.tags = tags;
+ this.value = value;
+ }
+
+ @JsonProperty
+ public String name() {
+ return name;
+ }
+
+ @JsonProperty
+ public String group() {
+ return group;
+ }
+
+ @JsonProperty
+ public Map<String, String> tags() {
+ return tags;
+ }
+
+ @JsonProperty
+ public Object value() {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/718dda11/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java b/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java
new file mode 100644
index 0000000..1cd3799
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PushHttpMetricsReporter.class)
+public class PushHttpMetricsReporterTest {
+
+ private static final URL URL;
+ static {
+ try {
+ URL = new URL("http://fake:80");
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ private PushHttpMetricsReporter reporter;
+ private Time time = new MockTime();
+ @MockStrict
+ private ScheduledExecutorService executor;
+ private Capture<Runnable> reportRunnable = EasyMock.newCapture();
+ @MockStrict
+ private HttpURLConnection httpReq;
+ @MockStrict
+ private OutputStream httpOut;
+ private Capture<byte[]> httpPayload = EasyMock.newCapture();
+ @MockStrict
+ private InputStream httpErr;
+
+ @Before
+ public void setUp() {
+ reporter = new PushHttpMetricsReporter(time, executor);
+ PowerMock.mockStatic(PushHttpMetricsReporter.class);
+ }
+
+ @Test
+ public void testConfigureClose() throws Exception {
+ expectConfigure();
+ expectClose();
+
+ replayAll();
+
+ configure();
+ reporter.close();
+
+ verifyAll();
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testConfigureBadUrl() throws Exception {
+ Map<String, String> config = new HashMap<>();
+ config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, "malformed;url");
+ config.put(PushHttpMetricsReporter.METRICS_PERIOD_CONFIG, "5");
+ reporter.configure(config);
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testConfigureMissingPeriod() throws Exception {
+ Map<String, String> config = new HashMap<>();
+ config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, URL.toString());
+ reporter.configure(config);
+ }
+
+ @Test
+ public void testNoMetrics() throws Exception {
+ expectConfigure();
+ expectRequest(200);
+ expectClose();
+
+ replayAll();
+
+ configure();
+ reportRunnable.getValue().run();
+ JsonNode payload = new ObjectMapper().readTree(httpPayload.getValue());
+ assertTrue(payload.isObject());
+
+ assertPayloadHasClientInfo(payload);
+
+ // Should contain an empty list of metrics, i.e. we report updates even if there are no metrics to report to
+ // indicate liveness
+ JsonNode metrics = payload.get("metrics");
+ assertTrue(metrics.isArray());
+ assertEquals(0, metrics.size());
+
+ reporter.close();
+
+ verifyAll();
+ }
+
+ // For error conditions, we expect them to come with a response body that we can read & log
+ @Test
+ public void testClientError() throws Exception {
+ expectConfigure();
+ expectRequest(400, true);
+ expectClose();
+
+ replayAll();
+
+ configure();
+ reportRunnable.getValue().run();
+
+ reporter.close();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testServerError() throws Exception {
+ expectConfigure();
+ expectRequest(500, true);
+ expectClose();
+
+ replayAll();
+
+ configure();
+ reportRunnable.getValue().run();
+
+ reporter.close();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testMetricValues() throws Exception {
+ expectConfigure();
+ expectRequest(200);
+ expectClose();
+
+ replayAll();
+
+ configure();
+ KafkaMetric metric1 = new KafkaMetric(
+ new Object(),
+ new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
+ new ImmutableValue(1.0),
+ null,
+ time
+ );
+ KafkaMetric newMetric1 = new KafkaMetric(
+ new Object(),
+ new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
+ new ImmutableValue(-1.0),
+ null,
+ time
+ );
+ KafkaMetric metric2 = new KafkaMetric(
+ new Object(),
+ new MetricName("name2", "group2", "desc2", Collections.singletonMap("key2", "value2")),
+ new ImmutableValue(2.0),
+ null,
+ time
+ );
+ KafkaMetric metric3 = new KafkaMetric(
+ new Object(),
+ new MetricName("name3", "group3", "desc3", Collections.singletonMap("key3", "value3")),
+ new ImmutableValue(3.0),
+ null,
+ time
+ );
+ reporter.init(Arrays.asList(metric1, metric2));
+ reporter.metricChange(newMetric1); // added in init, modified
+ reporter.metricChange(metric3); // added by change
+ reporter.metricRemoval(metric2); // added in init, deleted by removal
+
+ reportRunnable.getValue().run();
+ JsonNode payload = new ObjectMapper().readTree(httpPayload.getValue());
+ assertTrue(payload.isObject());
+ assertPayloadHasClientInfo(payload);
+
+ // We should be left with the modified version of metric1 and metric3
+ JsonNode metrics = payload.get("metrics");
+ assertTrue(metrics.isArray());
+ assertEquals(2, metrics.size());
+
+ JsonNode m1 = metrics.get(0);
+ assertEquals("name1", m1.get("name").textValue());
+ assertEquals("group1", m1.get("group").textValue());
+ JsonNode m1Tags = m1.get("tags");
+ assertTrue(m1Tags.isObject());
+ assertEquals(1, m1Tags.size());
+ assertEquals("value1", m1Tags.get("key1").textValue());
+ assertEquals(-1.0, m1.get("value").doubleValue(), 0.0);
+
+ JsonNode m3 = metrics.get(1);
+ assertEquals("name3", m3.get("name").textValue());
+ assertEquals("group3", m3.get("group").textValue());
+ JsonNode m3Tags = m3.get("tags");
+ assertTrue(m3Tags.isObject());
+ assertEquals(1, m3Tags.size());
+ assertEquals("value3", m3Tags.get("key3").textValue());
+ assertEquals(3.0, m3.get("value").doubleValue(), 0.0);
+
+ reporter.close();
+
+ verifyAll();
+ }
+
+ private void expectConfigure() {
+ EasyMock.expect(
+ executor.scheduleAtFixedRate(EasyMock.capture(reportRunnable), EasyMock.eq(5L), EasyMock.eq(5L), EasyMock.eq(TimeUnit.SECONDS))
+ ).andReturn(null); // return value not expected to be used
+ }
+
+ private void configure() {
+ Map<String, String> config = new HashMap<>();
+ config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, URL.toString());
+ config.put(PushHttpMetricsReporter.METRICS_PERIOD_CONFIG, "5");
+ reporter.configure(config);
+ }
+
+ private void expectRequest(int returnStatus) throws Exception {
+ expectRequest(returnStatus, false);
+ }
+
+ // Expect that a request is made with the given response code
+ private void expectRequest(int returnStatus, boolean readResponse) throws Exception {
+ EasyMock.expect(PushHttpMetricsReporter.newHttpConnection(URL)).andReturn(httpReq);
+ httpReq.setRequestMethod("POST");
+ EasyMock.expectLastCall();
+ httpReq.setDoInput(true);
+ EasyMock.expectLastCall();
+ httpReq.setRequestProperty("Content-Type", "application/json");
+ EasyMock.expectLastCall();
+ httpReq.setRequestProperty(EasyMock.eq("Content-Length"), EasyMock.anyString());
+ EasyMock.expectLastCall();
+ httpReq.setRequestProperty("Accept", "*/*");
+ EasyMock.expectLastCall();
+ httpReq.setUseCaches(false);
+ EasyMock.expectLastCall();
+ httpReq.setDoOutput(true);
+ EasyMock.expectLastCall();
+ EasyMock.expect(httpReq.getOutputStream()).andReturn(httpOut);
+ httpOut.write(EasyMock.capture(httpPayload));
+ EasyMock.expectLastCall();
+ httpOut.flush();
+ EasyMock.expectLastCall();
+ httpOut.close();
+ EasyMock.expectLastCall();
+
+ EasyMock.expect(httpReq.getResponseCode()).andReturn(returnStatus);
+
+ if (readResponse)
+ expectReadResponse();
+
+ httpReq.disconnect();
+ EasyMock.expectLastCall();
+ }
+
+ private void assertPayloadHasClientInfo(JsonNode payload) throws UnknownHostException {
+ // Should contain client info...
+ JsonNode client = payload.get("client");
+ assertTrue(client.isObject());
+ assertEquals(InetAddress.getLocalHost().getCanonicalHostName(), client.get("host").textValue());
+ assertEquals("", client.get("client_id").textValue());
+ assertEquals(time.milliseconds(), client.get("time").longValue());
+ }
+
+ private void expectReadResponse() throws Exception {
+ EasyMock.expect(httpReq.getErrorStream()).andReturn(httpErr);
+ EasyMock.expect(PushHttpMetricsReporter.readResponse(httpErr)).andReturn("error response message");
+ EasyMock.expectLastCall();
+ }
+
+ private void expectClose() throws Exception {
+ executor.shutdown();
+ EasyMock.expect(executor.awaitTermination(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))).andReturn(true);
+ }
+
+ private static class ImmutableValue implements Measurable {
+ private final double value;
+
+ public ImmutableValue(double value) {
+ this.value = value;
+ }
+
+ @Override
+ public double measure(MetricConfig config, long now) {
+ return value;
+ }
+ }
+}