You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/11 15:20:53 UTC

[flink] branch release-1.7 updated (4f9485f -> 5c14b95)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4f9485f  [FLINK-10359] [docs] Scala example in DataSet docs is broken
     new f6d93f9  [FLINK-10252][metrics] Pass akkaFrameSize to MetricQueryService
     new 5c14b95  [FLINK-10252][metrics] Handle oversized metric messages

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../metrics/MetricRegistryConfiguration.java       |  23 ++++-
 .../flink/runtime/metrics/MetricRegistryImpl.java  |   5 +-
 .../metrics/dump/MetricDumpSerialization.java      |  84 +++++++++++-----
 .../runtime/metrics/dump/MetricQueryService.java   |  96 +++++++++++++++++-
 .../metrics/dump/MetricDumpSerializerTest.java     |   8 +-
 .../metrics/dump/MetricQueryServiceTest.java       | 111 ++++++++++++++++++++-
 .../handler/legacy/metrics/MetricFetcherTest.java  |   2 +-
 7 files changed, 294 insertions(+), 35 deletions(-)


[flink] 02/02: [FLINK-10252][metrics] Handle oversized metric messages

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c14b9526a57a820d78a80e3851d5f43e09cb99a
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Oct 18 13:07:53 2018 +0800

    [FLINK-10252][metrics] Handle oversized metric messages
---
 .../metrics/dump/MetricDumpSerialization.java      |  84 +++++++++++-----
 .../runtime/metrics/dump/MetricQueryService.java   |  81 +++++++++++++++
 .../metrics/dump/MetricDumpSerializerTest.java     |   8 +-
 .../metrics/dump/MetricQueryServiceTest.java       | 111 ++++++++++++++++++++-
 .../handler/legacy/metrics/MetricFetcherTest.java  |   2 +-
 5 files changed, 256 insertions(+), 30 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index 16a885d..5456b56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -73,19 +73,38 @@ public class MetricDumpSerialization {
 
 		private static final long serialVersionUID = 6928770855951536906L;
 
-		public final byte[] serializedMetrics;
+		public final byte[] serializedCounters;
+		public final byte[] serializedGauges;
+		public final byte[] serializedMeters;
+		public final byte[] serializedHistograms;
+
 		public final int numCounters;
 		public final int numGauges;
 		public final int numMeters;
 		public final int numHistograms;
 
-		public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) {
-			Preconditions.checkNotNull(serializedMetrics);
+		public MetricSerializationResult(
+			byte[] serializedCounters,
+			byte[] serializedGauges,
+			byte[] serializedMeters,
+			byte[] serializedHistograms,
+			int numCounters,
+			int numGauges,
+			int numMeters,
+			int numHistograms) {
+
+			Preconditions.checkNotNull(serializedCounters);
+			Preconditions.checkNotNull(serializedGauges);
+			Preconditions.checkNotNull(serializedMeters);
+			Preconditions.checkNotNull(serializedHistograms);
 			Preconditions.checkArgument(numCounters >= 0);
 			Preconditions.checkArgument(numGauges >= 0);
 			Preconditions.checkArgument(numMeters >= 0);
 			Preconditions.checkArgument(numHistograms >= 0);
-			this.serializedMetrics = serializedMetrics;
+			this.serializedCounters = serializedCounters;
+			this.serializedGauges = serializedGauges;
+			this.serializedMeters = serializedMeters;
+			this.serializedHistograms = serializedHistograms;
 			this.numCounters = numCounters;
 			this.numGauges = numGauges;
 			this.numMeters = numMeters;
@@ -102,7 +121,10 @@ public class MetricDumpSerialization {
 	 */
 	public static class MetricDumpSerializer {
 
-		private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
+		private DataOutputSerializer countersBuffer = new DataOutputSerializer(1024 * 8);
+		private DataOutputSerializer gaugesBuffer = new DataOutputSerializer(1024 * 8);
+		private DataOutputSerializer metersBuffer = new DataOutputSerializer(1024 * 8);
+		private DataOutputSerializer histogramsBuffer = new DataOutputSerializer(1024 * 8);
 
 		/**
 		 * Serializes the given metrics and returns the resulting byte array.
@@ -126,53 +148,66 @@ public class MetricDumpSerialization {
 			Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
 			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
 
-			buffer.clear();
-
+			countersBuffer.clear();
 			int numCounters = 0;
 			for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
 				try {
-					serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
 					numCounters++;
 				} catch (Exception e) {
 					LOG.debug("Failed to serialize counter.", e);
 				}
 			}
 
+			gaugesBuffer.clear();
 			int numGauges = 0;
 			for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
 				try {
-					serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
 					numGauges++;
 				} catch (Exception e) {
 					LOG.debug("Failed to serialize gauge.", e);
 				}
 			}
 
+			histogramsBuffer.clear();
 			int numHistograms = 0;
 			for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
 				try {
-					serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					serializeHistogram(histogramsBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
 					numHistograms++;
 				} catch (Exception e) {
 					LOG.debug("Failed to serialize histogram.", e);
 				}
 			}
 
+			metersBuffer.clear();
 			int numMeters = 0;
 			for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
 				try {
-					serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					serializeMeter(metersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
 					numMeters++;
 				} catch (Exception e) {
 					LOG.debug("Failed to serialize meter.", e);
 				}
 			}
 
-			return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms);
+			return new MetricSerializationResult(
+				countersBuffer.getCopyOfBuffer(),
+				gaugesBuffer.getCopyOfBuffer(),
+				metersBuffer.getCopyOfBuffer(),
+				histogramsBuffer.getCopyOfBuffer(),
+				numCounters,
+				numGauges,
+				numMeters,
+				numHistograms);
 		}
 
 		public void close() {
-			buffer = null;
+			countersBuffer = null;
+			gaugesBuffer = null;
+			metersBuffer = null;
+			histogramsBuffer = null;
 		}
 	}
 
@@ -280,13 +315,16 @@ public class MetricDumpSerialization {
 		 * @return A list containing the deserialized metrics.
 		 */
 		public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
-			DataInputView in = new DataInputDeserializer(data.serializedMetrics, 0, data.serializedMetrics.length);
+			DataInputView countersInputView = new DataInputDeserializer(data.serializedCounters, 0, data.serializedCounters.length);
+			DataInputView gaugesInputView = new DataInputDeserializer(data.serializedGauges, 0, data.serializedGauges.length);
+			DataInputView metersInputView = new DataInputDeserializer(data.serializedMeters, 0, data.serializedMeters.length);
+			DataInputView histogramsInputView = new DataInputDeserializer(data.serializedHistograms, 0, data.serializedHistograms.length);
 
-			List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
+			List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numMeters + data.numHistograms);
 
 			for (int x = 0; x < data.numCounters; x++) {
 				try {
-					metrics.add(deserializeCounter(in));
+					metrics.add(deserializeCounter(countersInputView));
 				} catch (Exception e) {
 					LOG.debug("Failed to deserialize counter.", e);
 				}
@@ -294,25 +332,25 @@ public class MetricDumpSerialization {
 
 			for (int x = 0; x < data.numGauges; x++) {
 				try {
-					metrics.add(deserializeGauge(in));
+					metrics.add(deserializeGauge(gaugesInputView));
 				} catch (Exception e) {
 					LOG.debug("Failed to deserialize gauge.", e);
 				}
 			}
 
-			for (int x = 0; x < data.numHistograms; x++) {
+			for (int x = 0; x < data.numMeters; x++) {
 				try {
-					metrics.add(deserializeHistogram(in));
+					metrics.add(deserializeMeter(metersInputView));
 				} catch (Exception e) {
-					LOG.debug("Failed to deserialize histogram.", e);
+					LOG.debug("Failed to deserialize meter.", e);
 				}
 			}
 
-			for (int x = 0; x < data.numMeters; x++) {
+			for (int x = 0; x < data.numHistograms; x++) {
 				try {
-					metrics.add(deserializeMeter(in));
+					metrics.add(deserializeHistogram(histogramsInputView));
 				} catch (Exception e) {
-					LOG.debug("Failed to deserialize meter.", e);
+					LOG.debug("Failed to deserialize histogram.", e);
 				}
 			}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index ffda231..fc69d17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -55,6 +55,7 @@ public class MetricQueryService extends UntypedActor {
 	private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);
 
 	public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";
+	private static final String SIZE_EXCEEDED_LOG_TEMPLATE =  "{} will not be reported as the metric dump would exceed the maximum size of {} bytes.";
 
 	private static final CharacterFilter FILTER = new CharacterFilter() {
 		@Override
@@ -115,6 +116,9 @@ public class MetricQueryService extends UntypedActor {
 				}
 			} else if (message instanceof CreateDump) {
 				MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
+
+				dump = enforceSizeLimit(dump);
+
 				getSender().tell(dump, getSelf());
 			} else {
 				LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
@@ -125,6 +129,83 @@ public class MetricQueryService extends UntypedActor {
 		}
 	}
 
+	private MetricDumpSerialization.MetricSerializationResult enforceSizeLimit(
+		MetricDumpSerialization.MetricSerializationResult serializationResult) {
+
+		int currentLength = 0;
+		boolean hasExceededBefore = false;
+
+		byte[] serializedCounters = serializationResult.serializedCounters;
+		int numCounters = serializationResult.numCounters;
+		if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedCounters.length)) {
+			logDumpSizeWouldExceedLimit("Counters", hasExceededBefore);
+			hasExceededBefore = true;
+
+			serializedCounters = new byte[0];
+			numCounters = 0;
+		} else {
+			currentLength += serializedCounters.length;
+		}
+
+		byte[] serializedMeters = serializationResult.serializedMeters;
+		int numMeters = serializationResult.numMeters;
+		if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedMeters.length)) {
+			logDumpSizeWouldExceedLimit("Meters", hasExceededBefore);
+			hasExceededBefore = true;
+
+			serializedMeters = new byte[0];
+			numMeters = 0;
+		} else {
+			currentLength += serializedMeters.length;
+		}
+
+		byte[] serializedGauges = serializationResult.serializedGauges;
+		int numGauges = serializationResult.numGauges;
+		if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedGauges.length)) {
+			logDumpSizeWouldExceedLimit("Gauges", hasExceededBefore);
+			hasExceededBefore = true;
+
+			serializedGauges = new byte[0];
+			numGauges = 0;
+		} else {
+			currentLength += serializedGauges.length;
+		}
+
+		byte[] serializedHistograms = serializationResult.serializedHistograms;
+		int numHistograms = serializationResult.numHistograms;
+		if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedHistograms.length)) {
+			logDumpSizeWouldExceedLimit("Histograms", hasExceededBefore);
+			hasExceededBefore = true;
+
+			serializedHistograms = new byte[0];
+			numHistograms = 0;
+		}
+
+		return new MetricDumpSerialization.MetricSerializationResult(
+			serializedCounters,
+			serializedGauges,
+			serializedMeters,
+			serializedHistograms,
+			numCounters,
+			numGauges,
+			numMeters,
+			numHistograms);
+	}
+
+	private boolean exceedsMessageSizeLimit(final int currentSize) {
+		return currentSize > messageSizeLimit;
+	}
+
+	private void logDumpSizeWouldExceedLimit(final String metricType, boolean hasExceededBefore) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(SIZE_EXCEEDED_LOG_TEMPLATE, metricType, messageSizeLimit);
+		} else {
+			if (!hasExceededBefore) {
+				LOG.info(SIZE_EXCEEDED_LOG_TEMPLATE, "Some metrics", messageSizeLimit);
+			}
+		}
+	}
+
 	/**
 	 * Lightweight method to replace unsupported characters.
 	 * If the string does not contain any unsupported characters, this method creates no
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 5f83e79..1aab6f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -70,7 +70,10 @@ public class MetricDumpSerializerTest {
 			Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());
 
 		// no metrics should be serialized
-		Assert.assertEquals(0, output.serializedMetrics.length);
+		Assert.assertEquals(0, output.serializedCounters.length);
+		Assert.assertEquals(0, output.serializedGauges.length);
+		Assert.assertEquals(0, output.serializedHistograms.length);
+		Assert.assertEquals(0, output.serializedMeters.length);
 
 		List<MetricDump> deserialized = deserializer.deserialize(output);
 		Assert.assertEquals(0, deserialized.size());
@@ -141,7 +144,8 @@ public class MetricDumpSerializerTest {
 		gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
 		histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));
 
-		MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(counters, gauges, histograms, meters);
+		MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(
+			counters, gauges, histograms, meters);
 		List<MetricDump> deserialized = deserializer.deserialize(serialized);
 
 		// ===== Counters ==============================================================================================
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index afc5962..ccc2236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.metrics.dump;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -38,6 +39,10 @@ import akka.actor.UntypedActor;
 import akka.testkit.TestActorRef;
 import org.junit.Test;
 
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -47,9 +52,8 @@ import static org.junit.Assert.assertTrue;
 public class MetricQueryServiceTest extends TestLogger {
 	@Test
 	public void testCreateDump() throws Exception {
-
 		ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
-		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, 50L);
+		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, Long.MAX_VALUE);
 		TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
 		TestActor testActor = (TestActor) testActorRef.underlyingActor();
 
@@ -98,7 +102,10 @@ public class MetricQueryServiceTest extends TestLogger {
 
 		MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
 		testActor.message = null;
-		assertTrue(dump.serializedMetrics.length > 0);
+		assertTrue(dump.serializedCounters.length > 0);
+		assertTrue(dump.serializedGauges.length > 0);
+		assertTrue(dump.serializedHistograms.length > 0);
+		assertTrue(dump.serializedMeters.length > 0);
 
 		MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
 		MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
@@ -114,7 +121,103 @@ public class MetricQueryServiceTest extends TestLogger {
 
 		MetricDumpSerialization.MetricSerializationResult emptyDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
 		testActor.message = null;
-		assertEquals(0, emptyDump.serializedMetrics.length);
+		assertEquals(0, emptyDump.serializedCounters.length);
+		assertEquals(0, emptyDump.serializedGauges.length);
+		assertEquals(0, emptyDump.serializedHistograms.length);
+		assertEquals(0, emptyDump.serializedMeters.length);
+
+		s.terminate();
+	}
+
+	@Test
+	public void testHandleOversizedMetricMessage() throws Exception {
+		ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
+		final long sizeLimit = 200L;
+		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, sizeLimit);
+		TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
+		TestActor testActor = (TestActor) testActorRef.underlyingActor();
+
+		final Counter c = new SimpleCounter();
+		final Histogram h = new TestHistogram();
+		final Meter m = new Meter() {
+
+			@Override
+			public void markEvent() {
+			}
+
+			@Override
+			public void markEvent(long n) {
+			}
+
+			@Override
+			public double getRate() {
+				return 5;
+			}
+
+			@Override
+			public long getCount() {
+				return 10;
+			}
+		};
+
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+
+		final String gaugeValue = "Hello";
+		final long requiredGaugesToExceedLimit = sizeLimit / gaugeValue.length() + 1;
+		List<Tuple2<String, Gauge<String>>> gauges = LongStream.range(0, requiredGaugesToExceedLimit)
+			.mapToObj(x -> Tuple2.of("gauge" + x, (Gauge<String>) () -> "Hello" + x))
+			.collect(Collectors.toList());
+		gauges.forEach(gauge -> MetricQueryService.notifyOfAddedMetric(serviceActor, gauge.f1, gauge.f0, tm));
+
+		MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm);
+		MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm);
+		MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm);
+
+		serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
+		synchronized (testActor.lock) {
+			if (testActor.message == null) {
+				testActor.lock.wait();
+			}
+		}
+
+		MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
+		testActor.message = null;
+		assertTrue(dump.serializedCounters.length > 0);
+		assertEquals(1, dump.numCounters);
+		assertTrue(dump.serializedMeters.length > 0);
+		assertEquals(1, dump.numMeters);
+
+		// gauges exceeded the size limit and will be excluded
+		assertEquals(0, dump.serializedGauges.length);
+		assertEquals(0, dump.numGauges);
+
+		assertTrue(dump.serializedHistograms.length > 0);
+		assertEquals(1, dump.numHistograms);
+
+		// unregister all but one gauge to ensure gauges are reported again if the remaining fit
+		for (int x = 1; x < gauges.size(); x++) {
+			MetricQueryService.notifyOfRemovedMetric(serviceActor, gauges.get(x).f1);
+		}
+
+		serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
+		synchronized (testActor.lock) {
+			if (testActor.message == null) {
+				testActor.lock.wait();
+			}
+		}
+
+		MetricDumpSerialization.MetricSerializationResult recoveredDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
+		testActor.message = null;
+
+		assertTrue(recoveredDump.serializedCounters.length > 0);
+		assertEquals(1, recoveredDump.numCounters);
+		assertTrue(recoveredDump.serializedMeters.length > 0);
+		assertEquals(1, recoveredDump.numMeters);
+		assertTrue(recoveredDump.serializedGauges.length > 0);
+		assertEquals(1, recoveredDump.numGauges);
+		assertTrue(recoveredDump.serializedHistograms.length > 0);
+		assertEquals(1, recoveredDump.numHistograms);
 
 		s.terminate();
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index da8182a..61c028f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -99,7 +99,7 @@ public class MetricFetcherTest extends TestLogger {
 		MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmRID, jobID);
 
 		when(jmQueryService.queryMetrics(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
+			.thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new byte[0], new byte[0], 0, 0, 0, 0)));
 		when(tmQueryService.queryMetrics(any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
 


[flink] 01/02: [FLINK-10252][metrics] Pass akkaFrameSize to MetricQueryService

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6d93f9a8af27b64e6e7f3f1fd3b98cf4204b12d
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Oct 18 13:07:53 2018 +0800

    [FLINK-10252][metrics] Pass akkaFrameSize to MetricQueryService
---
 .../metrics/MetricRegistryConfiguration.java       | 23 ++++++++++++++++++++--
 .../flink/runtime/metrics/MetricRegistryImpl.java  |  5 ++++-
 .../runtime/metrics/dump/MetricQueryService.java   | 15 ++++++++++++--
 .../metrics/dump/MetricQueryServiceTest.java       |  2 +-
 4 files changed, 39 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 7188a59..244a1ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
@@ -26,6 +27,8 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.util.Preconditions;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,14 +69,18 @@ public class MetricRegistryConfiguration {
 	// contains for every configured reporter its name and the configuration object
 	private final List<Tuple2<String, Configuration>> reporterConfigurations;
 
+	private final long queryServiceMessageSizeLimit;
+
 	public MetricRegistryConfiguration(
 		ScopeFormats scopeFormats,
 		char delimiter,
-		List<Tuple2<String, Configuration>> reporterConfigurations) {
+		List<Tuple2<String, Configuration>> reporterConfigurations,
+		long queryServiceMessageSizeLimit) {
 
 		this.scopeFormats = Preconditions.checkNotNull(scopeFormats);
 		this.delimiter = delimiter;
 		this.reporterConfigurations = Preconditions.checkNotNull(reporterConfigurations);
+		this.queryServiceMessageSizeLimit = queryServiceMessageSizeLimit;
 	}
 
 	// ------------------------------------------------------------------------
@@ -92,6 +99,10 @@ public class MetricRegistryConfiguration {
 		return reporterConfigurations;
 	}
 
+	public long getQueryServiceMessageSizeLimit() {
+		return queryServiceMessageSizeLimit;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Static factory methods
 	// ------------------------------------------------------------------------
@@ -160,7 +171,15 @@ public class MetricRegistryConfiguration {
 			}
 		}
 
-		return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations);
+		final String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
+		final String akkaConfigStr = String.format("akka {remote {netty.tcp {maximum-frame-size = %s}}}", maxFrameSizeStr);
+		final Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
+		final long maximumFrameSize = akkaConfig.getBytes("akka.remote.netty.tcp.maximum-frame-size");
+
+		// padding to account for serialization overhead
+		final long messageSizeLimitPadding = 256;
+
+		return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations, maximumFrameSize - messageSizeLimitPadding);
 	}
 
 	public static MetricRegistryConfiguration defaultMetricRegistryConfiguration() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index 6b37709..31775e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -77,6 +77,8 @@ public class MetricRegistryImpl implements MetricRegistry {
 
 	private final CompletableFuture<Void> terminationFuture;
 
+	private final long maximumFramesize;
+
 	@Nullable
 	private ActorRef queryService;
 
@@ -91,6 +93,7 @@ public class MetricRegistryImpl implements MetricRegistry {
 	 * Creates a new MetricRegistry and starts the configured reporter.
 	 */
 	public MetricRegistryImpl(MetricRegistryConfiguration config) {
+		this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
 		this.scopeFormats = config.getScopeFormats();
 		this.globalDelimiter = config.getDelimiter();
 		this.delimiters = new ArrayList<>(10);
@@ -184,7 +187,7 @@ public class MetricRegistryImpl implements MetricRegistry {
 			Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
 
 			try {
-				queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+				queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID, maximumFramesize);
 				metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService);
 			} catch (Exception e) {
 				LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 8821e0d..ffda231 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -70,6 +70,12 @@ public class MetricQueryService extends UntypedActor {
 	private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
 	private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
 
+	private final long messageSizeLimit;
+
+	public MetricQueryService(long messageSizeLimit) {
+		this.messageSizeLimit = messageSizeLimit;
+	}
+
 	@Override
 	public void postStop() {
 		serializer.close();
@@ -165,11 +171,16 @@ public class MetricQueryService extends UntypedActor {
 	 * @param resourceID resource ID to disambiguate the actor name
 	 * @return actor reference to the MetricQueryService
 	 */
-	public static ActorRef startMetricQueryService(ActorSystem actorSystem, ResourceID resourceID) {
+	public static ActorRef startMetricQueryService(
+		ActorSystem actorSystem,
+		ResourceID resourceID,
+		long maximumFramesize) {
+
 		String actorName = resourceID == null
 			? METRIC_QUERY_SERVICE_NAME
 			: METRIC_QUERY_SERVICE_NAME + "_" + resourceID.getResourceIdString();
-		return actorSystem.actorOf(Props.create(MetricQueryService.class), actorName);
+
+		return actorSystem.actorOf(Props.create(MetricQueryService.class, maximumFramesize), actorName);
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 3767421..afc5962 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -49,7 +49,7 @@ public class MetricQueryServiceTest extends TestLogger {
 	public void testCreateDump() throws Exception {
 
 		ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
-		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null);
+		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, 50L);
 		TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
 		TestActor testActor = (TestActor) testActorRef.underlyingActor();