You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/09/12 18:05:28 UTC
[kafka] branch trunk updated: MINOR: Remove deprecated
Metric.value() method usage (#5626)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c121f4e MINOR: Remove deprecated Metric.value() method usage (#5626)
c121f4e is described below
commit c121f4eb82da654acbdd133a556cfe1f9197a46a
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Wed Sep 12 23:35:19 2018 +0530
MINOR: Remove deprecated Metric.value() method usage (#5626)
Reviewers: Viktor Somogyi <vi...@gmail.com>, John Roesler <jo...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
.../clients/consumer/internals/FetcherTest.java | 50 +++++++-------
.../clients/producer/internals/SenderTest.java | 8 +--
.../apache/kafka/common/metrics/MetricsTest.java | 78 +++++++++++++---------
.../common/metrics/stats/FrequenciesTest.java | 24 +++----
.../apache/kafka/common/network/NioEchoServer.java | 2 +-
core/src/main/scala/kafka/utils/ToolsUtils.scala | 2 +-
.../scala/integration/kafka/api/MetricsTest.scala | 2 +-
.../kafka/api/PlaintextConsumerTest.scala | 6 +-
.../other/kafka/ReplicationQuotasTestRig.scala | 2 +-
.../unit/kafka/server/ClientQuotaManagerTest.scala | 12 ++--
.../kafka/server/ReplicationQuotaManagerTest.scala | 2 +-
.../unit/kafka/server/ReplicationQuotasTest.scala | 2 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../state/internals/MeteredSessionStoreTest.java | 12 ++--
.../kafka/tools/PushHttpMetricsReporter.java | 2 +-
15 files changed, 110 insertions(+), 96 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 3bf3deb..48d61b9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1576,8 +1576,8 @@ public class FetcherTest {
KafkaMetric avgMetric = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg));
KafkaMetric maxMetric = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax));
// Throttle times are ApiVersions=400, Fetch=(100, 200, 300)
- assertEquals(250, avgMetric.value(), EPSILON);
- assertEquals(400, maxMetric.value(), EPSILON);
+ assertEquals(250, (Double) avgMetric.metricValue(), EPSILON);
+ assertEquals(400, (Double) maxMetric.metricValue(), EPSILON);
client.close();
}
@@ -1599,14 +1599,14 @@ public class FetcherTest {
KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
// recordsFetchLagMax should be initialized to negative infinity
- assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);
+ assertEquals(Double.NEGATIVE_INFINITY, (Double) recordsFetchLagMax.metricValue(), EPSILON);
// recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse
fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
- assertEquals(100, recordsFetchLagMax.value(), EPSILON);
+ assertEquals(100, (Double) recordsFetchLagMax.metricValue(), EPSILON);
KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
- assertEquals(100, partitionLag.value(), EPSILON);
+ assertEquals(100, (Double) partitionLag.metricValue(), EPSILON);
// recordsFetchLagMax should be hw - offset of the last message after receiving a non-empty FetchResponse
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
@@ -1614,8 +1614,8 @@ public class FetcherTest {
for (int v = 0; v < 3; v++)
builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
fetchRecords(tp0, builder.build(), Errors.NONE, 200L, 0);
- assertEquals(197, recordsFetchLagMax.value(), EPSILON);
- assertEquals(197, partitionLag.value(), EPSILON);
+ assertEquals(197, (Double) recordsFetchLagMax.metricValue(), EPSILON);
+ assertEquals(197, (Double) partitionLag.metricValue(), EPSILON);
// verify de-registration of partition lag
subscriptions.unsubscribe();
@@ -1637,14 +1637,14 @@ public class FetcherTest {
KafkaMetric recordsFetchLeadMin = allMetrics.get(minLeadMetric);
// recordsFetchLeadMin should be initialized to MAX_VALUE
- assertEquals(Double.MAX_VALUE, recordsFetchLeadMin.value(), EPSILON);
+ assertEquals(Double.MAX_VALUE, (Double) recordsFetchLeadMin.metricValue(), EPSILON);
// recordsFetchLeadMin should be position - logStartOffset after receiving an empty FetchResponse
fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0);
- assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
+ assertEquals(0L, (Double) recordsFetchLeadMin.metricValue(), EPSILON);
KafkaMetric partitionLead = allMetrics.get(partitionLeadMetric);
- assertEquals(0L, partitionLead.value(), EPSILON);
+ assertEquals(0L, (Double) partitionLead.metricValue(), EPSILON);
// recordsFetchLeadMin should be position - logStartOffset after receiving a non-empty FetchResponse
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
@@ -1653,8 +1653,8 @@ public class FetcherTest {
builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
}
fetchRecords(tp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0);
- assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
- assertEquals(3L, partitionLead.value(), EPSILON);
+ assertEquals(0L, (Double) recordsFetchLeadMin.metricValue(), EPSILON);
+ assertEquals(3L, (Double) partitionLead.metricValue(), EPSILON);
// verify de-registration of partition lag
subscriptions.unsubscribe();
@@ -1681,14 +1681,14 @@ public class FetcherTest {
KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
// recordsFetchLagMax should be initialized to negative infinity
- assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);
+ assertEquals(Double.NEGATIVE_INFINITY, (Double) recordsFetchLagMax.metricValue(), EPSILON);
// recordsFetchLagMax should be lso - fetchOffset after receiving an empty FetchResponse
fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
- assertEquals(50, recordsFetchLagMax.value(), EPSILON);
+ assertEquals(50, (Double) recordsFetchLagMax.metricValue(), EPSILON);
KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
- assertEquals(50, partitionLag.value(), EPSILON);
+ assertEquals(50, (Double) partitionLag.metricValue(), EPSILON);
// recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
@@ -1696,8 +1696,8 @@ public class FetcherTest {
for (int v = 0; v < 3; v++)
builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
fetchRecords(tp0, builder.build(), Errors.NONE, 200L, 150L, 0);
- assertEquals(147, recordsFetchLagMax.value(), EPSILON);
- assertEquals(147, partitionLag.value(), EPSILON);
+ assertEquals(147, (Double) recordsFetchLagMax.metricValue(), EPSILON);
+ assertEquals(147, (Double) partitionLag.metricValue(), EPSILON);
// verify de-registration of partition lag
subscriptions.unsubscribe();
@@ -1748,8 +1748,8 @@ public class FetcherTest {
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
- assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
- assertEquals(6, recordsCountAverage.value(), EPSILON);
+ assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), EPSILON);
+ assertEquals(6, (Double) recordsCountAverage.metricValue(), EPSILON);
}
@Test
@@ -1774,8 +1774,8 @@ public class FetcherTest {
}
fetchRecords(tp0, records, Errors.NONE, 100L, 0);
- assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
- assertEquals(2, recordsCountAverage.value(), EPSILON);
+ assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), EPSILON);
+ assertEquals(2, (Double) recordsCountAverage.metricValue(), EPSILON);
}
@Test
@@ -1810,8 +1810,8 @@ public class FetcherTest {
for (Record record : records.records())
expectedBytes += record.sizeInBytes();
- assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
- assertEquals(3, recordsCountAverage.value(), EPSILON);
+ assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), EPSILON);
+ assertEquals(3, (Double) recordsCountAverage.metricValue(), EPSILON);
}
@Test
@@ -1851,8 +1851,8 @@ public class FetcherTest {
for (Record record : records.records())
expectedBytes += record.sizeInBytes();
- assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
- assertEquals(3, recordsCountAverage.value(), EPSILON);
+ assertEquals(expectedBytes, (Double) fetchSizeAverage.metricValue(), EPSILON);
+ assertEquals(3, (Double) recordsCountAverage.metricValue(), EPSILON);
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 1a6e778..f93f343 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -294,8 +294,8 @@ public class SenderTest {
KafkaMetric avgMetric = allMetrics.get(this.senderMetricsRegistry.produceThrottleTimeAvg);
KafkaMetric maxMetric = allMetrics.get(this.senderMetricsRegistry.produceThrottleTimeMax);
// Throttle times are ApiVersions=400, Produce=(100, 200, 300)
- assertEquals(250, avgMetric.value(), EPS);
- assertEquals(400, maxMetric.value(), EPS);
+ assertEquals(250, (Double) avgMetric.metricValue(), EPS);
+ assertEquals(400, (Double) maxMetric.metricValue(), EPS);
client.close();
}
@@ -1771,7 +1771,7 @@ public class SenderTest {
assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());
KafkaMetric recordErrors = m.metrics().get(senderMetrics.recordErrorRate);
- assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);
+ assertTrue("Expected non-zero value for record send errors", (Double) recordErrors.metricValue() > 0);
assertTrue(responseFuture.isDone());
assertEquals(0, (long) transactionManager.sequenceNumber(tp0));
@@ -1914,7 +1914,7 @@ public class SenderTest {
assertEquals("The last ack'd sequence number should be 1", 1, txnManager.lastAckedSequence(tp));
assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
- assertTrue("There should be a split", m.metrics().get(senderMetrics.batchSplitRate).value() > 0);
+ assertTrue("There should be a split", (Double) (m.metrics().get(senderMetrics.batchSplitRate).metricValue()) > 0);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 5c75d03..eb3f775 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -103,6 +104,10 @@ public class MetricsTest {
@Test
public void testSimpleStats() throws Exception {
+ verifyStats(m -> (double) m.metricValue());
+ }
+
+ private void verifyStats(Function<KafkaMetric, Double> metricValueFunc) {
ConstantMeasurable measurable = new ConstantMeasurable();
metrics.addMetric(metrics.metricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable);
@@ -132,24 +137,24 @@ public class MetricsTest {
// prior to any time passing
double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0;
assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs,
- metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS);
+ metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.occurences", "grp1"))), EPS);
// pretend 2 seconds passed...
long sleepTimeMs = 2;
time.sleep(sleepTimeMs * 1000);
elapsedSecs += sleepTimeMs;
- assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(metrics.metricName("s2.total", "grp1")).value(), EPS);
- assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(metrics.metricName("test.avg", "grp1")).value(), EPS);
- assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(metrics.metricName("test.max", "grp1")).value(), EPS);
- assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(metrics.metricName("test.min", "grp1")).value(), EPS);
+ assertEquals("s2 reflects the constant value", 5.0, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("s2.total", "grp1"))), EPS);
+ assertEquals("Avg(0...9) = 4.5", 4.5, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.avg", "grp1"))), EPS);
+ assertEquals("Max(0...9) = 9", count - 1, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.max", "grp1"))), EPS);
+ assertEquals("Min(0...9) = 0", 0.0, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.min", "grp1"))), EPS);
assertEquals("Rate(0...9) = 1.40625",
- sum / elapsedSecs, metrics.metrics().get(metrics.metricName("test.rate", "grp1")).value(), EPS);
+ sum / elapsedSecs, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.rate", "grp1"))), EPS);
assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs),
count / elapsedSecs,
- metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS);
+ metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.occurences", "grp1"))), EPS);
assertEquals("Count(0...9) = 10",
- (double) count, metrics.metrics().get(metrics.metricName("test.count", "grp1")).value(), EPS);
+ (double) count, metricValueFunc.apply(metrics.metrics().get(metrics.metricName("test.count", "grp1"))), EPS);
}
@Test
@@ -172,11 +177,11 @@ public class MetricsTest {
child2.record();
grandchild.record();
- double p1 = parent1.metrics().get(0).value();
- double p2 = parent2.metrics().get(0).value();
- double c1 = child1.metrics().get(0).value();
- double c2 = child2.metrics().get(0).value();
- double gc = grandchild.metrics().get(0).value();
+ double p1 = (double) parent1.metrics().get(0).metricValue();
+ double p2 = (double) parent2.metrics().get(0).metricValue();
+ double c1 = (double) child1.metrics().get(0).metricValue();
+ double c2 = (double) child2.metrics().get(0).metricValue();
+ double gc = (double) grandchild.metrics().get(0).metricValue();
/* each metric should have a count equal to one + its children's count */
assertEquals(1.0, gc, EPS);
@@ -395,7 +400,7 @@ public class MetricsTest {
} catch (QuotaViolationException e) {
// this is good
}
- assertEquals(6.0, metrics.metrics().get(metrics.metricName("test1.total", "grp1")).value(), EPS);
+ assertEquals(6.0, (Double) metrics.metrics().get(metrics.metricName("test1.total", "grp1")).metricValue(), EPS);
sensor.record(-6.0);
try {
sensor.record(-1.0);
@@ -438,24 +443,24 @@ public class MetricsTest {
for (int i = 0; i < buckets; i++)
sensor.record(i);
- assertEquals(25, p25.value(), 1.0);
- assertEquals(50, p50.value(), 1.0);
- assertEquals(75, p75.value(), 1.0);
+ assertEquals(25, (Double) p25.metricValue(), 1.0);
+ assertEquals(50, (Double) p50.metricValue(), 1.0);
+ assertEquals(75, (Double) p75.metricValue(), 1.0);
for (int i = 0; i < buckets; i++)
sensor.record(0.0);
- assertEquals(0.0, p25.value(), 1.0);
- assertEquals(0.0, p50.value(), 1.0);
- assertEquals(0.0, p75.value(), 1.0);
+ assertEquals(0.0, (Double) p25.metricValue(), 1.0);
+ assertEquals(0.0, (Double) p50.metricValue(), 1.0);
+ assertEquals(0.0, (Double) p75.metricValue(), 1.0);
// record two more windows worth of sequential values
for (int i = 0; i < buckets; i++)
sensor.record(i);
- assertEquals(25, p25.value(), 1.0);
- assertEquals(50, p50.value(), 1.0);
- assertEquals(75, p75.value(), 1.0);
+ assertEquals(25, (Double) p25.metricValue(), 1.0);
+ assertEquals(50, (Double) p50.metricValue(), 1.0);
+ assertEquals(75, (Double) p75.metricValue(), 1.0);
}
@Test
@@ -479,7 +484,7 @@ public class MetricsTest {
s.record(100);
sum += 100;
time.sleep(cfg.timeWindowMs());
- assertEquals(sum, totalMetric.value(), EPS);
+ assertEquals(sum, (Double) totalMetric.metricValue(), EPS);
}
// Sleep for half the window.
@@ -490,19 +495,19 @@ public class MetricsTest {
KafkaMetric rateMetric = metrics.metrics().get(rateMetricName);
KafkaMetric countRateMetric = metrics.metrics().get(countRateMetricName);
- assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, rateMetric.value(), EPS);
- assertEquals("Count rate(0...2) = 0.02666", count / elapsedSecs, countRateMetric.value(), EPS);
+ assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, (Double) rateMetric.metricValue(), EPS);
+ assertEquals("Count rate(0...2) = 0.02666", count / elapsedSecs, (Double) countRateMetric.metricValue(), EPS);
assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS);
- assertEquals(sum, totalMetric.value(), EPS);
- assertEquals(count, countTotalMetric.value(), EPS);
+ assertEquals(sum, (Double) totalMetric.metricValue(), EPS);
+ assertEquals(count, (Double) countTotalMetric.metricValue(), EPS);
// Verify that rates are expired, but total is cumulative
time.sleep(cfg.timeWindowMs() * cfg.samples());
- assertEquals(0, rateMetric.value(), EPS);
- assertEquals(0, countRateMetric.value(), EPS);
- assertEquals(sum, totalMetric.value(), EPS);
- assertEquals(count, countTotalMetric.value(), EPS);
+ assertEquals(0, (Double) rateMetric.metricValue(), EPS);
+ assertEquals(0, (Double) countRateMetric.metricValue(), EPS);
+ assertEquals(sum, (Double) totalMetric.metricValue(), EPS);
+ assertEquals(count, (Double) countTotalMetric.metricValue(), EPS);
}
public static class ConstantMeasurable implements Measurable {
@@ -829,4 +834,13 @@ public class MetricsTest {
return sensor;
}
}
+
+ /**
+ * This test is to verify the deprecated {@link Metric#value()} method.
+ * @deprecated This will be removed in a future major release.
+ */
+ @Test
+ public void testDeprecatedMetricValueMethod() {
+ verifyStats(KafkaMetric::value);
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
index 9b6f686..e3e623f 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -124,29 +124,29 @@ public class FrequenciesTest {
for (int i = 0; i != 100; ++i) {
frequencies.record(config, i % 4 + 1, time.milliseconds());
}
- assertEquals(0.25, metric1.value(), DELTA);
- assertEquals(0.25, metric2.value(), DELTA);
- assertEquals(0.25, metric3.value(), DELTA);
- assertEquals(0.25, metric4.value(), DELTA);
+ assertEquals(0.25, (Double) metric1.metricValue(), DELTA);
+ assertEquals(0.25, (Double) metric2.metricValue(), DELTA);
+ assertEquals(0.25, (Double) metric3.metricValue(), DELTA);
+ assertEquals(0.25, (Double) metric4.metricValue(), DELTA);
// Record 2 windows worth of values
for (int i = 0; i != 100; ++i) {
frequencies.record(config, i % 2 + 1, time.milliseconds());
}
- assertEquals(0.50, metric1.value(), DELTA);
- assertEquals(0.50, metric2.value(), DELTA);
- assertEquals(0.00, metric3.value(), DELTA);
- assertEquals(0.00, metric4.value(), DELTA);
+ assertEquals(0.50, (Double) metric1.metricValue(), DELTA);
+ assertEquals(0.50, (Double) metric2.metricValue(), DELTA);
+ assertEquals(0.00, (Double) metric3.metricValue(), DELTA);
+ assertEquals(0.00, (Double) metric4.metricValue(), DELTA);
// Record 1 window worth of values to overlap with the last window
// that is half 1.0 and half 2.0
for (int i = 0; i != 50; ++i) {
frequencies.record(config, 4.0, time.milliseconds());
}
- assertEquals(0.25, metric1.value(), DELTA);
- assertEquals(0.25, metric2.value(), DELTA);
- assertEquals(0.00, metric3.value(), DELTA);
- assertEquals(0.50, metric4.value(), DELTA);
+ assertEquals(0.25, (Double) metric1.metricValue(), DELTA);
+ assertEquals(0.25, (Double) metric2.metricValue(), DELTA);
+ assertEquals(0.00, (Double) metric3.metricValue(), DELTA);
+ assertEquals(0.50, (Double) metric4.metricValue(), DELTA);
}
protected MetricName name(String metricName) {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 7996e59..76d37c2 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -115,7 +115,7 @@ public class NioEchoServer extends Thread {
public double metricValue(String name) {
for (Map.Entry<MetricName, KafkaMetric> entry : metrics.metrics().entrySet()) {
if (entry.getKey().name().equals(name))
- return entry.getValue().value();
+ return (double) entry.getValue().metricValue();
}
throw new IllegalStateException("Metric not found, " + name + ", found=" + metrics.metrics().keySet());
}
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 50e04f5..76a12b6 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -52,7 +52,7 @@ object ToolsUtils {
if (maxLengthOfDisplayName < mergedKeyName.length) {
maxLengthOfDisplayName = mergedKeyName.length
}
- (mergedKeyName, value.value())
+ (mergedKeyName, value.metricValue)
}
println(s"\n%-${maxLengthOfDisplayName}s %s".format("Metric Name", "Value"))
sortedMap.foreach {
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 814754d..b2f5a7e 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -259,7 +259,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
group: Option[String]): Double = {
// Use max value of all matching metrics since Selector metrics are recorded for each Processor
verifyKafkaMetric(name, metrics, entity, group) { matchingMetrics =>
- matchingMetrics.foldLeft(0.0)((max, metric) => Math.max(max, metric.value))
+ matchingMetrics.foldLeft(0.0)((max, metric) => Math.max(max, metric.metricValue.asInstanceOf[Double]))
}
}
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 983b63c..1031483 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1493,7 +1493,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val fetchLag0 = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))
assertNotNull(fetchLag0)
val expectedLag = numMessages - records.count
- assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.value, epsilon)
+ assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.metricValue.asInstanceOf[Double], epsilon)
// Remove topic from subscription
consumer.subscribe(List(topic2).asJava, listener0)
@@ -1566,7 +1566,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertNotNull(fetchLag)
val expectedLag = numMessages - records.count
- assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon)
+ assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.metricValue.asInstanceOf[Double], epsilon)
consumer.assign(List(tp2).asJava)
TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
@@ -1624,7 +1624,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
tags.put("partition", String.valueOf(tp.partition()))
val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
- assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.value, epsilon)
+ assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.metricValue.asInstanceOf[Double], epsilon)
}
@Test
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index b2568c1..470ee4d 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -265,7 +265,7 @@ object ReplicationQuotasTestRig {
private def measuredRate(broker: KafkaServer, repType: QuotaType): Double = {
val metricName = broker.metrics.metricName("byte-rate", repType.toString)
if (broker.metrics.metrics.asScala.contains(metricName))
- broker.metrics.metrics.asScala(metricName).value
+ broker.metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
else -1
}
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index c5275c2..6f75174 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -276,7 +276,7 @@ class ClientQuotaManagerTest {
assertEquals(0, maybeRecord(clientMetrics, "ANONYMOUS", "unknown", 400))
time.sleep(1000)
}
- assertEquals(0, queueSizeMetric.value().toInt)
+ assertEquals(0, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
// Create a spike.
// 400*10 + 2000 + 300 = 6300/10.5 = 600 bytes per second.
@@ -287,7 +287,7 @@ class ClientQuotaManagerTest {
assertEquals("Should be throttled", 2100, sleepTime)
throttle(clientMetrics, "ANONYMOYUS", "unknown", sleepTime, callback)
- assertEquals(1, queueSizeMetric.value().toInt)
+ assertEquals(1, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
// After a request is delayed, the callback cannot be triggered immediately
clientMetrics.throttledChannelReaper.doWork()
assertEquals(0, numCallbacks)
@@ -295,7 +295,7 @@ class ClientQuotaManagerTest {
// Callback can only be triggered after the delay time passes
clientMetrics.throttledChannelReaper.doWork()
- assertEquals(0, queueSizeMetric.value().toInt)
+ assertEquals(0, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
assertEquals(1, numCallbacks)
// Could continue to see delays until the bursty sample disappears
@@ -326,7 +326,7 @@ class ClientQuotaManagerTest {
assertEquals(0, maybeRecord(quotaManager, "ANONYMOUS", "test-client", millisToPercent(4)))
time.sleep(1000)
}
- assertEquals(0, queueSizeMetric.value().toInt)
+ assertEquals(0, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
// Create a spike.
// quota = 1% (10ms per second)
@@ -339,7 +339,7 @@ class ClientQuotaManagerTest {
assertEquals("Should be throttled", 210, throttleTime)
throttle(quotaManager, "ANONYMOYUS", "test-client", throttleTime, callback)
- assertEquals(1, queueSizeMetric.value().toInt)
+ assertEquals(1, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
// After a request is delayed, the callback cannot be triggered immediately
quotaManager.throttledChannelReaper.doWork()
assertEquals(0, numCallbacks)
@@ -347,7 +347,7 @@ class ClientQuotaManagerTest {
// Callback can only be triggered after the delay time passes
quotaManager.throttledChannelReaper.doWork()
- assertEquals(0, queueSizeMetric.value().toInt)
+ assertEquals(0, queueSizeMetric.metricValue.asInstanceOf[Double].toInt)
assertEquals(1, numCallbacks)
// Could continue to see delays until the bursty sample disappears
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index b1edd01..dbb52e7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -99,7 +99,7 @@ class ReplicationQuotaManagerTest {
def rate(metrics: Metrics): Double = {
val metricName = metrics.metricName("byte-rate", LeaderReplication.toString, "Tracking byte-rate for " + LeaderReplication)
- val leaderThrottledRate = metrics.metrics.asScala(metricName).value()
+ val leaderThrottledRate = metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
leaderThrottledRate
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 5125486..78300c9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -236,6 +236,6 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
private def measuredRate(broker: KafkaServer, repType: QuotaType): Double = {
val metricName = broker.metrics.metricName("byte-rate", repType.toString)
- broker.metrics.metrics.asScala(metricName).value
+ broker.metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
}
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 7df1bd6..7032724 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -195,7 +195,7 @@ class RequestQuotaTest extends BaseRequestTest {
private def metricValue(metric: KafkaMetric, sensor: Sensor): Double = {
sensor.synchronized {
- if (metric == null) -1.0 else metric.value
+ if (metric == null) -1.0 else metric.metricValue.asInstanceOf[Double]
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 3bd190a..d9e6964 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -116,7 +116,7 @@ public class MeteredSessionStoreTest {
iterator.close();
final KafkaMetric metric = metric("fetch-rate");
- assertTrue(metric.value() > 0);
+ assertTrue((Double) metric.metricValue() > 0);
EasyMock.verify(inner);
}
@@ -133,7 +133,7 @@ public class MeteredSessionStoreTest {
iterator.close();
final KafkaMetric metric = metric("fetch-rate");
- assertTrue(metric.value() > 0);
+ assertTrue((Double) metric.metricValue() > 0);
EasyMock.verify(inner);
}
@@ -147,7 +147,7 @@ public class MeteredSessionStoreTest {
metered.remove(new Windowed<>(key, new SessionWindow(0, 0)));
final KafkaMetric metric = metric("remove-rate");
- assertTrue(metric.value() > 0);
+ assertTrue((Double) metric.metricValue() > 0);
EasyMock.verify(inner);
}
@@ -164,7 +164,7 @@ public class MeteredSessionStoreTest {
iterator.close();
final KafkaMetric metric = metric("fetch-rate");
- assertTrue(metric.value() > 0);
+ assertTrue((Double) metric.metricValue() > 0);
EasyMock.verify(inner);
}
@@ -181,7 +181,7 @@ public class MeteredSessionStoreTest {
iterator.close();
final KafkaMetric metric = metric("fetch-rate");
- assertTrue(metric.value() > 0);
+ assertTrue((Double) metric.metricValue() > 0);
EasyMock.verify(inner);
}
@@ -189,7 +189,7 @@ public class MeteredSessionStoreTest {
public void shouldRecordRestoreTimeOnInit() {
init();
final KafkaMetric metric = metric("restore-rate");
- assertTrue(metric.value() > 0);
+ assertTrue((Double) metric.metricValue() > 0);
}
@Test(expected = NullPointerException.class)
diff --git a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
index c5764b4..6adebf5 100644
--- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
@@ -174,7 +174,7 @@ public class PushHttpMetricsReporter implements MetricsReporter {
samples = new ArrayList<>(metrics.size());
for (KafkaMetric metric : metrics.values()) {
MetricName name = metric.metricName();
- double value = metric.value();
+ double value = (Double) metric.metricValue();
samples.add(new MetricValue(name.name(), name.group(), name.tags(), value));
}
}