You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/11 13:48:04 UTC

[flink] 01/04: [FLINK-10252][metrics] Pass akkaFrameSize to MetricQueryService

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7e6feead4b349617cd6499b8b5fc786544214ca4
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Oct 18 13:07:53 2018 +0800

    [FLINK-10252][metrics] Pass akkaFrameSize to MetricQueryService
---
 .../metrics/MetricRegistryConfiguration.java       | 23 ++++++++++++++++++++--
 .../flink/runtime/metrics/MetricRegistryImpl.java  |  5 ++++-
 .../runtime/metrics/dump/MetricQueryService.java   | 15 ++++++++++++--
 .../metrics/dump/MetricQueryServiceTest.java       |  2 +-
 4 files changed, 39 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 7188a59..244a1ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
@@ -26,6 +27,8 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.util.Preconditions;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,14 +69,18 @@ public class MetricRegistryConfiguration {
 	// contains for every configured reporter its name and the configuration object
 	private final List<Tuple2<String, Configuration>> reporterConfigurations;
 
+	private final long queryServiceMessageSizeLimit;
+
 	public MetricRegistryConfiguration(
 		ScopeFormats scopeFormats,
 		char delimiter,
-		List<Tuple2<String, Configuration>> reporterConfigurations) {
+		List<Tuple2<String, Configuration>> reporterConfigurations,
+		long queryServiceMessageSizeLimit) {
 
 		this.scopeFormats = Preconditions.checkNotNull(scopeFormats);
 		this.delimiter = delimiter;
 		this.reporterConfigurations = Preconditions.checkNotNull(reporterConfigurations);
+		this.queryServiceMessageSizeLimit = queryServiceMessageSizeLimit;
 	}
 
 	// ------------------------------------------------------------------------
@@ -92,6 +99,10 @@ public class MetricRegistryConfiguration {
 		return reporterConfigurations;
 	}
 
+	public long getQueryServiceMessageSizeLimit() {
+		return queryServiceMessageSizeLimit;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Static factory methods
 	// ------------------------------------------------------------------------
@@ -160,7 +171,15 @@ public class MetricRegistryConfiguration {
 			}
 		}
 
-		return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations);
+		final String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
+		final String akkaConfigStr = String.format("akka {remote {netty.tcp {maximum-frame-size = %s}}}", maxFrameSizeStr);
+		final Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
+		final long maximumFrameSize = akkaConfig.getBytes("akka.remote.netty.tcp.maximum-frame-size");
+
+		// padding to account for serialization overhead
+		final long messageSizeLimitPadding = 256;
+
+		return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations, maximumFrameSize - messageSizeLimitPadding);
 	}
 
 	public static MetricRegistryConfiguration defaultMetricRegistryConfiguration() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index 6b37709..31775e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -77,6 +77,8 @@ public class MetricRegistryImpl implements MetricRegistry {
 
 	private final CompletableFuture<Void> terminationFuture;
 
+	private final long maximumFramesize;
+
 	@Nullable
 	private ActorRef queryService;
 
@@ -91,6 +93,7 @@ public class MetricRegistryImpl implements MetricRegistry {
 	 * Creates a new MetricRegistry and starts the configured reporter.
 	 */
 	public MetricRegistryImpl(MetricRegistryConfiguration config) {
+		this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
 		this.scopeFormats = config.getScopeFormats();
 		this.globalDelimiter = config.getDelimiter();
 		this.delimiters = new ArrayList<>(10);
@@ -184,7 +187,7 @@ public class MetricRegistryImpl implements MetricRegistry {
 			Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
 
 			try {
-				queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+				queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID, maximumFramesize);
 				metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService);
 			} catch (Exception e) {
 				LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 8821e0d..ffda231 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -70,6 +70,12 @@ public class MetricQueryService extends UntypedActor {
 	private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
 	private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
 
+	private final long messageSizeLimit;
+
+	public MetricQueryService(long messageSizeLimit) {
+		this.messageSizeLimit = messageSizeLimit;
+	}
+
 	@Override
 	public void postStop() {
 		serializer.close();
@@ -165,11 +171,16 @@ public class MetricQueryService extends UntypedActor {
 	 * @param resourceID resource ID to disambiguate the actor name
 	 * @return actor reference to the MetricQueryService
 	 */
-	public static ActorRef startMetricQueryService(ActorSystem actorSystem, ResourceID resourceID) {
+	public static ActorRef startMetricQueryService(
+		ActorSystem actorSystem,
+		ResourceID resourceID,
+		long maximumFramesize) {
+
 		String actorName = resourceID == null
 			? METRIC_QUERY_SERVICE_NAME
 			: METRIC_QUERY_SERVICE_NAME + "_" + resourceID.getResourceIdString();
-		return actorSystem.actorOf(Props.create(MetricQueryService.class), actorName);
+
+		return actorSystem.actorOf(Props.create(MetricQueryService.class, maximumFramesize), actorName);
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 3767421..afc5962 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -49,7 +49,7 @@ public class MetricQueryServiceTest extends TestLogger {
 	public void testCreateDump() throws Exception {
 
 		ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
-		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null);
+		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, 50L);
 		TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
 		TestActor testActor = (TestActor) testActorRef.underlyingActor();