You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/11 15:20:55 UTC

[flink] 02/02: [FLINK-10252][metrics] Handle oversized metric messages

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c14b9526a57a820d78a80e3851d5f43e09cb99a
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Oct 18 13:07:53 2018 +0800

    [FLINK-10252][metrics] Handle oversized metric messages
---
 .../metrics/dump/MetricDumpSerialization.java      |  84 +++++++++++-----
 .../runtime/metrics/dump/MetricQueryService.java   |  81 +++++++++++++++
 .../metrics/dump/MetricDumpSerializerTest.java     |   8 +-
 .../metrics/dump/MetricQueryServiceTest.java       | 111 ++++++++++++++++++++-
 .../handler/legacy/metrics/MetricFetcherTest.java  |   2 +-
 5 files changed, 256 insertions(+), 30 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index 16a885d..5456b56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -73,19 +73,38 @@ public class MetricDumpSerialization {
 
 		private static final long serialVersionUID = 6928770855951536906L;
 
-		public final byte[] serializedMetrics;
+		public final byte[] serializedCounters;
+		public final byte[] serializedGauges;
+		public final byte[] serializedMeters;
+		public final byte[] serializedHistograms;
+
 		public final int numCounters;
 		public final int numGauges;
 		public final int numMeters;
 		public final int numHistograms;
 
-		public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) {
-			Preconditions.checkNotNull(serializedMetrics);
+		public MetricSerializationResult(
+			byte[] serializedCounters,
+			byte[] serializedGauges,
+			byte[] serializedMeters,
+			byte[] serializedHistograms,
+			int numCounters,
+			int numGauges,
+			int numMeters,
+			int numHistograms) {
+
+			Preconditions.checkNotNull(serializedCounters);
+			Preconditions.checkNotNull(serializedGauges);
+			Preconditions.checkNotNull(serializedMeters);
+			Preconditions.checkNotNull(serializedHistograms);
 			Preconditions.checkArgument(numCounters >= 0);
 			Preconditions.checkArgument(numGauges >= 0);
 			Preconditions.checkArgument(numMeters >= 0);
 			Preconditions.checkArgument(numHistograms >= 0);
-			this.serializedMetrics = serializedMetrics;
+			this.serializedCounters = serializedCounters;
+			this.serializedGauges = serializedGauges;
+			this.serializedMeters = serializedMeters;
+			this.serializedHistograms = serializedHistograms;
 			this.numCounters = numCounters;
 			this.numGauges = numGauges;
 			this.numMeters = numMeters;
@@ -102,7 +121,10 @@ public class MetricDumpSerialization {
 	 */
 	public static class MetricDumpSerializer {
 
-		private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
+		private DataOutputSerializer countersBuffer = new DataOutputSerializer(1024 * 8);
+		private DataOutputSerializer gaugesBuffer = new DataOutputSerializer(1024 * 8);
+		private DataOutputSerializer metersBuffer = new DataOutputSerializer(1024 * 8);
+		private DataOutputSerializer histogramsBuffer = new DataOutputSerializer(1024 * 8);
 
 		/**
 		 * Serializes the given metrics and returns the resulting byte array.
@@ -126,53 +148,66 @@ public class MetricDumpSerialization {
 			Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
 			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
 
-			buffer.clear();
-
+			countersBuffer.clear();
 			int numCounters = 0;
 			for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
 				try {
-					serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
 					numCounters++;
 				} catch (Exception e) {
 					LOG.debug("Failed to serialize counter.", e);
 				}
 			}
 
+			gaugesBuffer.clear();
 			int numGauges = 0;
 			for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
 				try {
-					serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
 					numGauges++;
 				} catch (Exception e) {
 					LOG.debug("Failed to serialize gauge.", e);
 				}
 			}
 
+			histogramsBuffer.clear();
 			int numHistograms = 0;
 			for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
 				try {
-					serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					serializeHistogram(histogramsBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
 					numHistograms++;
 				} catch (Exception e) {
 					LOG.debug("Failed to serialize histogram.", e);
 				}
 			}
 
+			metersBuffer.clear();
 			int numMeters = 0;
 			for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
 				try {
-					serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					serializeMeter(metersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
 					numMeters++;
 				} catch (Exception e) {
 					LOG.debug("Failed to serialize meter.", e);
 				}
 			}
 
-			return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms);
+			return new MetricSerializationResult(
+				countersBuffer.getCopyOfBuffer(),
+				gaugesBuffer.getCopyOfBuffer(),
+				metersBuffer.getCopyOfBuffer(),
+				histogramsBuffer.getCopyOfBuffer(),
+				numCounters,
+				numGauges,
+				numMeters,
+				numHistograms);
 		}
 
 		public void close() {
-			buffer = null;
+			countersBuffer = null;
+			gaugesBuffer = null;
+			metersBuffer = null;
+			histogramsBuffer = null;
 		}
 	}
 
@@ -280,13 +315,16 @@ public class MetricDumpSerialization {
 		 * @return A list containing the deserialized metrics.
 		 */
 		public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
-			DataInputView in = new DataInputDeserializer(data.serializedMetrics, 0, data.serializedMetrics.length);
+			DataInputView countersInputView = new DataInputDeserializer(data.serializedCounters, 0, data.serializedCounters.length);
+			DataInputView gaugesInputView = new DataInputDeserializer(data.serializedGauges, 0, data.serializedGauges.length);
+			DataInputView metersInputView = new DataInputDeserializer(data.serializedMeters, 0, data.serializedMeters.length);
+			DataInputView histogramsInputView = new DataInputDeserializer(data.serializedHistograms, 0, data.serializedHistograms.length);
 
-			List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
+			List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numMeters + data.numHistograms);
 
 			for (int x = 0; x < data.numCounters; x++) {
 				try {
-					metrics.add(deserializeCounter(in));
+					metrics.add(deserializeCounter(countersInputView));
 				} catch (Exception e) {
 					LOG.debug("Failed to deserialize counter.", e);
 				}
@@ -294,25 +332,25 @@ public class MetricDumpSerialization {
 
 			for (int x = 0; x < data.numGauges; x++) {
 				try {
-					metrics.add(deserializeGauge(in));
+					metrics.add(deserializeGauge(gaugesInputView));
 				} catch (Exception e) {
 					LOG.debug("Failed to deserialize gauge.", e);
 				}
 			}
 
-			for (int x = 0; x < data.numHistograms; x++) {
+			for (int x = 0; x < data.numMeters; x++) {
 				try {
-					metrics.add(deserializeHistogram(in));
+					metrics.add(deserializeMeter(metersInputView));
 				} catch (Exception e) {
-					LOG.debug("Failed to deserialize histogram.", e);
+					LOG.debug("Failed to deserialize meter.", e);
 				}
 			}
 
-			for (int x = 0; x < data.numMeters; x++) {
+			for (int x = 0; x < data.numHistograms; x++) {
 				try {
-					metrics.add(deserializeMeter(in));
+					metrics.add(deserializeHistogram(histogramsInputView));
 				} catch (Exception e) {
-					LOG.debug("Failed to deserialize meter.", e);
+					LOG.debug("Failed to deserialize histogram.", e);
 				}
 			}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index ffda231..fc69d17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -55,6 +55,7 @@ public class MetricQueryService extends UntypedActor {
 	private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);
 
 	public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";
+	private static final String SIZE_EXCEEDED_LOG_TEMPLATE =  "{} will not be reported as the metric dump would exceed the maximum size of {} bytes.";
 
 	private static final CharacterFilter FILTER = new CharacterFilter() {
 		@Override
@@ -115,6 +116,9 @@ public class MetricQueryService extends UntypedActor {
 				}
 			} else if (message instanceof CreateDump) {
 				MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
+
+				dump = enforceSizeLimit(dump);
+
 				getSender().tell(dump, getSelf());
 			} else {
 				LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
@@ -125,6 +129,83 @@ public class MetricQueryService extends UntypedActor {
 		}
 	}
 
+	private MetricDumpSerialization.MetricSerializationResult enforceSizeLimit(
+		MetricDumpSerialization.MetricSerializationResult serializationResult) {
+
+		int currentLength = 0;
+		boolean hasExceededBefore = false;
+
+		byte[] serializedCounters = serializationResult.serializedCounters;
+		int numCounters = serializationResult.numCounters;
+		if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedCounters.length)) {
+			logDumpSizeWouldExceedLimit("Counters", hasExceededBefore);
+			hasExceededBefore = true;
+
+			serializedCounters = new byte[0];
+			numCounters = 0;
+		} else {
+			currentLength += serializedCounters.length;
+		}
+
+		byte[] serializedMeters = serializationResult.serializedMeters;
+		int numMeters = serializationResult.numMeters;
+		if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedMeters.length)) {
+			logDumpSizeWouldExceedLimit("Meters", hasExceededBefore);
+			hasExceededBefore = true;
+
+			serializedMeters = new byte[0];
+			numMeters = 0;
+		} else {
+			currentLength += serializedMeters.length;
+		}
+
+		byte[] serializedGauges = serializationResult.serializedGauges;
+		int numGauges = serializationResult.numGauges;
+		if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedGauges.length)) {
+			logDumpSizeWouldExceedLimit("Gauges", hasExceededBefore);
+			hasExceededBefore = true;
+
+			serializedGauges = new byte[0];
+			numGauges = 0;
+		} else {
+			currentLength += serializedGauges.length;
+		}
+
+		byte[] serializedHistograms = serializationResult.serializedHistograms;
+		int numHistograms = serializationResult.numHistograms;
+		if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedHistograms.length)) {
+			logDumpSizeWouldExceedLimit("Histograms", hasExceededBefore);
+			hasExceededBefore = true;
+
+			serializedHistograms = new byte[0];
+			numHistograms = 0;
+		}
+
+		return new MetricDumpSerialization.MetricSerializationResult(
+			serializedCounters,
+			serializedGauges,
+			serializedMeters,
+			serializedHistograms,
+			numCounters,
+			numGauges,
+			numMeters,
+			numHistograms);
+	}
+
+	private boolean exceedsMessageSizeLimit(final int currentSize) {
+		return currentSize > messageSizeLimit;
+	}
+
+	private void logDumpSizeWouldExceedLimit(final String metricType, boolean hasExceededBefore) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(SIZE_EXCEEDED_LOG_TEMPLATE, metricType, messageSizeLimit);
+		} else {
+			if (!hasExceededBefore) {
+				LOG.info(SIZE_EXCEEDED_LOG_TEMPLATE, "Some metrics", messageSizeLimit);
+			}
+		}
+	}
+
 	/**
 	 * Lightweight method to replace unsupported characters.
 	 * If the string does not contain any unsupported characters, this method creates no
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 5f83e79..1aab6f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -70,7 +70,10 @@ public class MetricDumpSerializerTest {
 			Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());
 
 		// no metrics should be serialized
-		Assert.assertEquals(0, output.serializedMetrics.length);
+		Assert.assertEquals(0, output.serializedCounters.length);
+		Assert.assertEquals(0, output.serializedGauges.length);
+		Assert.assertEquals(0, output.serializedHistograms.length);
+		Assert.assertEquals(0, output.serializedMeters.length);
 
 		List<MetricDump> deserialized = deserializer.deserialize(output);
 		Assert.assertEquals(0, deserialized.size());
@@ -141,7 +144,8 @@ public class MetricDumpSerializerTest {
 		gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
 		histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));
 
-		MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(counters, gauges, histograms, meters);
+		MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(
+			counters, gauges, histograms, meters);
 		List<MetricDump> deserialized = deserializer.deserialize(serialized);
 
 		// ===== Counters ==============================================================================================
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index afc5962..ccc2236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.metrics.dump;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -38,6 +39,10 @@ import akka.actor.UntypedActor;
 import akka.testkit.TestActorRef;
 import org.junit.Test;
 
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -47,9 +52,8 @@ import static org.junit.Assert.assertTrue;
 public class MetricQueryServiceTest extends TestLogger {
 	@Test
 	public void testCreateDump() throws Exception {
-
 		ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
-		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, 50L);
+		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, Long.MAX_VALUE);
 		TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
 		TestActor testActor = (TestActor) testActorRef.underlyingActor();
 
@@ -98,7 +102,10 @@ public class MetricQueryServiceTest extends TestLogger {
 
 		MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
 		testActor.message = null;
-		assertTrue(dump.serializedMetrics.length > 0);
+		assertTrue(dump.serializedCounters.length > 0);
+		assertTrue(dump.serializedGauges.length > 0);
+		assertTrue(dump.serializedHistograms.length > 0);
+		assertTrue(dump.serializedMeters.length > 0);
 
 		MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
 		MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
@@ -114,7 +121,103 @@ public class MetricQueryServiceTest extends TestLogger {
 
 		MetricDumpSerialization.MetricSerializationResult emptyDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
 		testActor.message = null;
-		assertEquals(0, emptyDump.serializedMetrics.length);
+		assertEquals(0, emptyDump.serializedCounters.length);
+		assertEquals(0, emptyDump.serializedGauges.length);
+		assertEquals(0, emptyDump.serializedHistograms.length);
+		assertEquals(0, emptyDump.serializedMeters.length);
+
+		s.terminate();
+	}
+
+	@Test
+	public void testHandleOversizedMetricMessage() throws Exception {
+		ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
+		final long sizeLimit = 200L;
+		ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null, sizeLimit);
+		TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
+		TestActor testActor = (TestActor) testActorRef.underlyingActor();
+
+		final Counter c = new SimpleCounter();
+		final Histogram h = new TestHistogram();
+		final Meter m = new Meter() {
+
+			@Override
+			public void markEvent() {
+			}
+
+			@Override
+			public void markEvent(long n) {
+			}
+
+			@Override
+			public double getRate() {
+				return 5;
+			}
+
+			@Override
+			public long getCount() {
+				return 10;
+			}
+		};
+
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+
+		final String gaugeValue = "Hello";
+		final long requiredGaugesToExceedLimit = sizeLimit / gaugeValue.length() + 1;
+		List<Tuple2<String, Gauge<String>>> gauges = LongStream.range(0, requiredGaugesToExceedLimit)
+			.mapToObj(x -> Tuple2.of("gauge" + x, (Gauge<String>) () -> "Hello" + x))
+			.collect(Collectors.toList());
+		gauges.forEach(gauge -> MetricQueryService.notifyOfAddedMetric(serviceActor, gauge.f1, gauge.f0, tm));
+
+		MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm);
+		MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm);
+		MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm);
+
+		serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
+		synchronized (testActor.lock) {
+			if (testActor.message == null) {
+				testActor.lock.wait();
+			}
+		}
+
+		MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
+		testActor.message = null;
+		assertTrue(dump.serializedCounters.length > 0);
+		assertEquals(1, dump.numCounters);
+		assertTrue(dump.serializedMeters.length > 0);
+		assertEquals(1, dump.numMeters);
+
+		// gauges exceeded the size limit and will be excluded
+		assertEquals(0, dump.serializedGauges.length);
+		assertEquals(0, dump.numGauges);
+
+		assertTrue(dump.serializedHistograms.length > 0);
+		assertEquals(1, dump.numHistograms);
+
+		// unregister all but one gauge to ensure gauges are reported again if the remaining fit
+		for (int x = 1; x < gauges.size(); x++) {
+			MetricQueryService.notifyOfRemovedMetric(serviceActor, gauges.get(x).f1);
+		}
+
+		serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
+		synchronized (testActor.lock) {
+			if (testActor.message == null) {
+				testActor.lock.wait();
+			}
+		}
+
+		MetricDumpSerialization.MetricSerializationResult recoveredDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
+		testActor.message = null;
+
+		assertTrue(recoveredDump.serializedCounters.length > 0);
+		assertEquals(1, recoveredDump.numCounters);
+		assertTrue(recoveredDump.serializedMeters.length > 0);
+		assertEquals(1, recoveredDump.numMeters);
+		assertTrue(recoveredDump.serializedGauges.length > 0);
+		assertEquals(1, recoveredDump.numGauges);
+		assertTrue(recoveredDump.serializedHistograms.length > 0);
+		assertEquals(1, recoveredDump.numHistograms);
 
 		s.terminate();
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index da8182a..61c028f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -99,7 +99,7 @@ public class MetricFetcherTest extends TestLogger {
 		MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmRID, jobID);
 
 		when(jmQueryService.queryMetrics(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
+			.thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new byte[0], new byte[0], 0, 0, 0, 0)));
 		when(tmQueryService.queryMetrics(any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));