You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/11 15:20:55 UTC
[flink] 02/02: [FLINK-10252][metrics] Handle oversized metric
messages
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5c14b9526a57a820d78a80e3851d5f43e09cb99a
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Oct 18 13:07:53 2018 +0800
[FLINK-10252][metrics] Handle oversized metric messages
---
.../metrics/dump/MetricDumpSerialization.java | 84 +++++++++++-----
.../runtime/metrics/dump/MetricQueryService.java | 81 +++++++++++++++
.../metrics/dump/MetricDumpSerializerTest.java | 8 +-
.../metrics/dump/MetricQueryServiceTest.java | 111 ++++++++++++++++++++-
.../handler/legacy/metrics/MetricFetcherTest.java | 2 +-
5 files changed, 256 insertions(+), 30 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index 16a885d..5456b56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -73,19 +73,38 @@ public class MetricDumpSerialization {
private static final long serialVersionUID = 6928770855951536906L;
- public final byte[] serializedMetrics;
+ public final byte[] serializedCounters;
+ public final byte[] serializedGauges;
+ public final byte[] serializedMeters;
+ public final byte[] serializedHistograms;
+
public final int numCounters;
public final int numGauges;
public final int numMeters;
public final int numHistograms;
- public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) {
- Preconditions.checkNotNull(serializedMetrics);
+ public MetricSerializationResult(
+ byte[] serializedCounters,
+ byte[] serializedGauges,
+ byte[] serializedMeters,
+ byte[] serializedHistograms,
+ int numCounters,
+ int numGauges,
+ int numMeters,
+ int numHistograms) {
+
+ Preconditions.checkNotNull(serializedCounters);
+ Preconditions.checkNotNull(serializedGauges);
+ Preconditions.checkNotNull(serializedMeters);
+ Preconditions.checkNotNull(serializedHistograms);
Preconditions.checkArgument(numCounters >= 0);
Preconditions.checkArgument(numGauges >= 0);
Preconditions.checkArgument(numMeters >= 0);
Preconditions.checkArgument(numHistograms >= 0);
- this.serializedMetrics = serializedMetrics;
+ this.serializedCounters = serializedCounters;
+ this.serializedGauges = serializedGauges;
+ this.serializedMeters = serializedMeters;
+ this.serializedHistograms = serializedHistograms;
this.numCounters = numCounters;
this.numGauges = numGauges;
this.numMeters = numMeters;
@@ -102,7 +121,10 @@ public class MetricDumpSerialization {
*/
public static class MetricDumpSerializer {
- private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
+ private DataOutputSerializer countersBuffer = new DataOutputSerializer(1024 * 8);
+ private DataOutputSerializer gaugesBuffer = new DataOutputSerializer(1024 * 8);
+ private DataOutputSerializer metersBuffer = new DataOutputSerializer(1024 * 8);
+ private DataOutputSerializer histogramsBuffer = new DataOutputSerializer(1024 * 8);
/**
* Serializes the given metrics and returns the resulting byte array.
@@ -126,53 +148,66 @@ public class MetricDumpSerialization {
Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
- buffer.clear();
-
+ countersBuffer.clear();
int numCounters = 0;
for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
try {
- serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
numCounters++;
} catch (Exception e) {
LOG.debug("Failed to serialize counter.", e);
}
}
+ gaugesBuffer.clear();
int numGauges = 0;
for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
try {
- serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
numGauges++;
} catch (Exception e) {
LOG.debug("Failed to serialize gauge.", e);
}
}
+ histogramsBuffer.clear();
int numHistograms = 0;
for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
try {
- serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ serializeHistogram(histogramsBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
numHistograms++;
} catch (Exception e) {
LOG.debug("Failed to serialize histogram.", e);
}
}
+ metersBuffer.clear();
int numMeters = 0;
for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
try {
- serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ serializeMeter(metersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
numMeters++;
} catch (Exception e) {
LOG.debug("Failed to serialize meter.", e);
}
}
- return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms);
+ return new MetricSerializationResult(
+ countersBuffer.getCopyOfBuffer(),
+ gaugesBuffer.getCopyOfBuffer(),
+ metersBuffer.getCopyOfBuffer(),
+ histogramsBuffer.getCopyOfBuffer(),
+ numCounters,
+ numGauges,
+ numMeters,
+ numHistograms);
}
public void close() {
- buffer = null;
+ countersBuffer = null;
+ gaugesBuffer = null;
+ metersBuffer = null;
+ histogramsBuffer = null;
}
}
@@ -280,13 +315,16 @@ public class MetricDumpSerialization {
* @return A list containing the deserialized metrics.
*/
public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
- DataInputView in = new DataInputDeserializer(data.serializedMetrics, 0, data.serializedMetrics.length);
+ DataInputView countersInputView = new DataInputDeserializer(data.serializedCounters, 0, data.serializedCounters.length);
+ DataInputView gaugesInputView = new DataInputDeserializer(data.serializedGauges, 0, data.serializedGauges.length);
+ DataInputView metersInputView = new DataInputDeserializer(data.serializedMeters, 0, data.serializedMeters.length);
+ DataInputView histogramsInputView = new DataInputDeserializer(data.serializedHistograms, 0, data.serializedHistograms.length);
- List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
+ List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numMeters + data.numHistograms);
for (int x = 0; x < data.numCounters; x++) {
try {
- metrics.add(deserializeCounter(in));
+ metrics.add(deserializeCounter(countersInputView));
} catch (Exception e) {
LOG.debug("Failed to deserialize counter.", e);
}
@@ -294,25 +332,25 @@ public class MetricDumpSerialization {
for (int x = 0; x < data.numGauges; x++) {
try {
- metrics.add(deserializeGauge(in));
+ metrics.add(deserializeGauge(gaugesInputView));
} catch (Exception e) {
LOG.debug("Failed to deserialize gauge.", e);
}
}
- for (int x = 0; x < data.numHistograms; x++) {
+ for (int x = 0; x < data.numMeters; x++) {
try {
- metrics.add(deserializeHistogram(in));
+ metrics.add(deserializeMeter(metersInputView));
} catch (Exception e) {
- LOG.debug("Failed to deserialize histogram.", e);
+ LOG.debug("Failed to deserialize meter.", e);
}
}
- for (int x = 0; x < data.numMeters; x++) {
+ for (int x = 0; x < data.numHistograms; x++) {
try {
- metrics.add(deserializeMeter(in));
+ metrics.add(deserializeHistogram(histogramsInputView));
} catch (Exception e) {
- LOG.debug("Failed to deserialize meter.", e);
+ LOG.debug("Failed to deserialize histogram.", e);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index ffda231..fc69d17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -55,6 +55,7 @@ public class MetricQueryService extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);
public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";
+ private static final String SIZE_EXCEEDED_LOG_TEMPLATE = "{} will not be reported as the metric dump would exceed the maximum size of {} bytes.";
private static final CharacterFilter FILTER = new CharacterFilter() {
@Override
@@ -115,6 +116,9 @@ public class MetricQueryService extends UntypedActor {
}
} else if (message instanceof CreateDump) {
MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
+
+ dump = enforceSizeLimit(dump);
+
getSender().tell(dump, getSelf());
} else {
LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
@@ -125,6 +129,83 @@ public class MetricQueryService extends UntypedActor {
}
}
+ private MetricDumpSerialization.MetricSerializationResult enforceSizeLimit(
+ MetricDumpSerialization.MetricSerializationResult serializationResult) {
+
+ int currentLength = 0;
+ boolean hasExceededBefore = false;
+
+ byte[] serializedCounters = serializationResult.serializedCounters;
+ int numCounters = serializationResult.numCounters;
+ if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedCounters.length)) {
+ logDumpSizeWouldExceedLimit("Counters", hasExceededBefore);
+ hasExceededBefore = true;
+
+ serializedCounters = new byte[0];
+ numCounters = 0;
+ } else {
+ currentLength += serializedCounters.length;
+ }
+
+ byte[] serializedMeters = serializationResult.serializedMeters;
+ int numMeters = serializationResult.numMeters;
+ if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedMeters.length)) {
+ logDumpSizeWouldExceedLimit("Meters", hasExceededBefore);
+ hasExceededBefore = true;
+
+ serializedMeters = new byte[0];
+ numMeters = 0;
+ } else {
+ currentLength += serializedMeters.length;
+ }
+
+ byte[] serializedGauges = serializationResult.serializedGauges;
+ int numGauges = serializationResult.numGauges;
+ if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedGauges.length)) {
+ logDumpSizeWouldExceedLimit("Gauges", hasExceededBefore);
+ hasExceededBefore = true;
+
+ serializedGauges = new byte[0];
+ numGauges = 0;
+ } else {
+ currentLength += serializedGauges.length;
+ }
+
+ byte[] serializedHistograms = serializationResult.serializedHistograms;
+ int numHistograms = serializationResult.numHistograms;
+ if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedHistograms.length)) {
+ logDumpSizeWouldExceedLimit("Histograms", hasExceededBefore);
+ hasExceededBefore = true;
+
+ serializedHistograms = new byte[0];
+ numHistograms = 0;
+ }
+
+ return new MetricDumpSerialization.MetricSerializationResult(
+ serializedCounters,
+ serializedGauges,
+ serializedMeters,
+ serializedHistograms,
+ numCounters,
+ numGauges,
+ numMeters,
+ numHistograms);
+ }
+
+ private boolean exceedsMessageSizeLimit(final int currentSize) {
+ return currentSize > messageSizeLimit;
+ }
+
+ private void logDumpSizeWouldExceedLimit(final String metricType, boolean hasExceededBefore) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(SIZE_EXCEEDED_LOG_TEMPLATE, metricType, messageSizeLimit);
+ } else {
+ if (!hasExceededBefore) {
+ LOG.info(SIZE_EXCEEDED_LOG_TEMPLATE, "Some metrics", messageSizeLimit);
+ }
+ }
+ }
+
/**
* Lightweight method to replace unsupported characters.
* If the string does not contain any unsupported characters, this method creates no
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 5f83e79..1aab6f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -70,7 +70,10 @@ public class MetricDumpSerializerTest {
Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());
// no metrics should be serialized
- Assert.assertEquals(0, output.serializedMetrics.length);
+ Assert.assertEquals(0, output.serializedCounters.length);
+ Assert.assertEquals(0, output.serializedGauges.length);
+ Assert.assertEquals(0, output.serializedHistograms.length);
+ Assert.assertEquals(0, output.serializedMeters.length);
List<MetricDump> deserialized = deserializer.deserialize(output);
Assert.assertEquals(0, deserialized.size());
@@ -141,7 +144,8 @@ public class MetricDumpSerializerTest {
gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));
- MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(counters, gauges, histograms, meters);
+ MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(
+ counters, gauges, histograms, meters);
List<MetricDump> deserialized = deserializer.deserialize(serialized);
// ===== Counters ==============================================================================================
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index afc5962..ccc2236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.metrics.dump;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
@@ -38,6 +39,10 @@ import akka.actor.UntypedActor;
import akka.testkit.TestActorRef;
import org.junit.Test;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -47,9 +52,8 @@ import static org.junit.Assert.assertTrue;
public class MetricQueryServiceTest extends TestLogger {
@Test
public void testCreateDump() throws Exception {
-
ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
- ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, 50L);
+ ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, Long.MAX_VALUE);
TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
TestActor testActor = (TestActor) testActorRef.underlyingActor();
@@ -98,7 +102,10 @@ public class MetricQueryServiceTest extends TestLogger {
MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
testActor.message = null;
- assertTrue(dump.serializedMetrics.length > 0);
+ assertTrue(dump.serializedCounters.length > 0);
+ assertTrue(dump.serializedGauges.length > 0);
+ assertTrue(dump.serializedHistograms.length > 0);
+ assertTrue(dump.serializedMeters.length > 0);
MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
@@ -114,7 +121,103 @@ public class MetricQueryServiceTest extends TestLogger {
MetricDumpSerialization.MetricSerializationResult emptyDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
testActor.message = null;
- assertEquals(0, emptyDump.serializedMetrics.length);
+ assertEquals(0, emptyDump.serializedCounters.length);
+ assertEquals(0, emptyDump.serializedGauges.length);
+ assertEquals(0, emptyDump.serializedHistograms.length);
+ assertEquals(0, emptyDump.serializedMeters.length);
+
+ s.terminate();
+ }
+
+ @Test
+ public void testHandleOversizedMetricMessage() throws Exception {
+ ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
+ final long sizeLimit = 200L;
+ ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, sizeLimit);
+ TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
+ TestActor testActor = (TestActor) testActorRef.underlyingActor();
+
+ final Counter c = new SimpleCounter();
+ final Histogram h = new TestHistogram();
+ final Meter m = new Meter() {
+
+ @Override
+ public void markEvent() {
+ }
+
+ @Override
+ public void markEvent(long n) {
+ }
+
+ @Override
+ public double getRate() {
+ return 5;
+ }
+
+ @Override
+ public long getCount() {
+ return 10;
+ }
+ };
+
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+
+ final String gaugeValue = "Hello";
+ final long requiredGaugesToExceedLimit = sizeLimit / gaugeValue.length() + 1;
+ List<Tuple2<String, Gauge<String>>> gauges = LongStream.range(0, requiredGaugesToExceedLimit)
+ .mapToObj(x -> Tuple2.of("gauge" + x, (Gauge<String>) () -> "Hello" + x))
+ .collect(Collectors.toList());
+ gauges.forEach(gauge -> MetricQueryService.notifyOfAddedMetric(serviceActor, gauge.f1, gauge.f0, tm));
+
+ MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm);
+ MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm);
+ MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm);
+
+ serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
+ synchronized (testActor.lock) {
+ if (testActor.message == null) {
+ testActor.lock.wait();
+ }
+ }
+
+ MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
+ testActor.message = null;
+ assertTrue(dump.serializedCounters.length > 0);
+ assertEquals(1, dump.numCounters);
+ assertTrue(dump.serializedMeters.length > 0);
+ assertEquals(1, dump.numMeters);
+
+ // gauges exceeded the size limit and will be excluded
+ assertEquals(0, dump.serializedGauges.length);
+ assertEquals(0, dump.numGauges);
+
+ assertTrue(dump.serializedHistograms.length > 0);
+ assertEquals(1, dump.numHistograms);
+
+ // unregister all but one gauge to ensure gauges are reported again if the remaining fit
+ for (int x = 1; x < gauges.size(); x++) {
+ MetricQueryService.notifyOfRemovedMetric(serviceActor, gauges.get(x).f1);
+ }
+
+ serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
+ synchronized (testActor.lock) {
+ if (testActor.message == null) {
+ testActor.lock.wait();
+ }
+ }
+
+ MetricDumpSerialization.MetricSerializationResult recoveredDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
+ testActor.message = null;
+
+ assertTrue(recoveredDump.serializedCounters.length > 0);
+ assertEquals(1, recoveredDump.numCounters);
+ assertTrue(recoveredDump.serializedMeters.length > 0);
+ assertEquals(1, recoveredDump.numMeters);
+ assertTrue(recoveredDump.serializedGauges.length > 0);
+ assertEquals(1, recoveredDump.numGauges);
+ assertTrue(recoveredDump.serializedHistograms.length > 0);
+ assertEquals(1, recoveredDump.numHistograms);
s.terminate();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index da8182a..61c028f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -99,7 +99,7 @@ public class MetricFetcherTest extends TestLogger {
MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmRID, jobID);
when(jmQueryService.queryMetrics(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
+ .thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new byte[0], new byte[0], 0, 0, 0, 0)));
when(tmQueryService.queryMetrics(any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));