You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/11 13:48:04 UTC
[flink] 01/04: [FLINK-10252][metrics] Pass akkaFrameSize to
MetricQueryService
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7e6feead4b349617cd6499b8b5fc786544214ca4
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Oct 18 13:07:53 2018 +0800
[FLINK-10252][metrics] Pass akkaFrameSize to MetricQueryService
---
.../metrics/MetricRegistryConfiguration.java | 23 ++++++++++++++++++++--
.../flink/runtime/metrics/MetricRegistryImpl.java | 5 ++++-
.../runtime/metrics/dump/MetricQueryService.java | 15 ++++++++++++--
.../metrics/dump/MetricQueryServiceTest.java | 2 +-
4 files changed, 39 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 7188a59..244a1ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.metrics;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
@@ -26,6 +27,8 @@ import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.util.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,14 +69,18 @@ public class MetricRegistryConfiguration {
// contains for every configured reporter its name and the configuration object
private final List<Tuple2<String, Configuration>> reporterConfigurations;
+ private final long queryServiceMessageSizeLimit;
+
public MetricRegistryConfiguration(
ScopeFormats scopeFormats,
char delimiter,
- List<Tuple2<String, Configuration>> reporterConfigurations) {
+ List<Tuple2<String, Configuration>> reporterConfigurations,
+ long queryServiceMessageSizeLimit) {
this.scopeFormats = Preconditions.checkNotNull(scopeFormats);
this.delimiter = delimiter;
this.reporterConfigurations = Preconditions.checkNotNull(reporterConfigurations);
+ this.queryServiceMessageSizeLimit = queryServiceMessageSizeLimit;
}
// ------------------------------------------------------------------------
@@ -92,6 +99,10 @@ public class MetricRegistryConfiguration {
return reporterConfigurations;
}
+ public long getQueryServiceMessageSizeLimit() {
+ return queryServiceMessageSizeLimit;
+ }
+
// ------------------------------------------------------------------------
// Static factory methods
// ------------------------------------------------------------------------
@@ -160,7 +171,15 @@ public class MetricRegistryConfiguration {
}
}
- return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations);
+ final String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
+ final String akkaConfigStr = String.format("akka {remote {netty.tcp {maximum-frame-size = %s}}}", maxFrameSizeStr);
+ final Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
+ final long maximumFrameSize = akkaConfig.getBytes("akka.remote.netty.tcp.maximum-frame-size");
+
+ // padding to account for serialization overhead
+ final long messageSizeLimitPadding = 256;
+
+ return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations, maximumFrameSize - messageSizeLimitPadding);
}
public static MetricRegistryConfiguration defaultMetricRegistryConfiguration() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index 6b37709..31775e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -77,6 +77,8 @@ public class MetricRegistryImpl implements MetricRegistry {
private final CompletableFuture<Void> terminationFuture;
+ private final long maximumFramesize;
+
@Nullable
private ActorRef queryService;
@@ -91,6 +93,7 @@ public class MetricRegistryImpl implements MetricRegistry {
* Creates a new MetricRegistry and starts the configured reporter.
*/
public MetricRegistryImpl(MetricRegistryConfiguration config) {
+ this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
this.scopeFormats = config.getScopeFormats();
this.globalDelimiter = config.getDelimiter();
this.delimiters = new ArrayList<>(10);
@@ -184,7 +187,7 @@ public class MetricRegistryImpl implements MetricRegistry {
Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
try {
- queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+ queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID, maximumFramesize);
metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService);
} catch (Exception e) {
LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 8821e0d..ffda231 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -70,6 +70,12 @@ public class MetricQueryService extends UntypedActor {
private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
+ private final long messageSizeLimit;
+
+ public MetricQueryService(long messageSizeLimit) {
+ this.messageSizeLimit = messageSizeLimit;
+ }
+
@Override
public void postStop() {
serializer.close();
@@ -165,11 +171,16 @@ public class MetricQueryService extends UntypedActor {
* @param resourceID resource ID to disambiguate the actor name
* @return actor reference to the MetricQueryService
*/
- public static ActorRef startMetricQueryService(ActorSystem actorSystem, ResourceID resourceID) {
+ public static ActorRef startMetricQueryService(
+ ActorSystem actorSystem,
+ ResourceID resourceID,
+ long maximumFramesize) {
+
String actorName = resourceID == null
? METRIC_QUERY_SERVICE_NAME
: METRIC_QUERY_SERVICE_NAME + "_" + resourceID.getResourceIdString();
- return actorSystem.actorOf(Props.create(MetricQueryService.class), actorName);
+
+ return actorSystem.actorOf(Props.create(MetricQueryService.class, maximumFramesize), actorName);
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 3767421..afc5962 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -49,7 +49,7 @@ public class MetricQueryServiceTest extends TestLogger {
public void testCreateDump() throws Exception {
ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
- ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null);
+ ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, 50L);
TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
TestActor testActor = (TestActor) testActorRef.underlyingActor();