You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/23 21:18:23 UTC
[1/4] flink git commit: [hotfix] Remove unused variable in
MetricDumpSerializerTest
Repository: flink
Updated Branches:
refs/heads/master da26bdc2e -> a8e85a2d5
[hotfix] Remove unused variable in MetricDumpSerializerTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02d7e4a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02d7e4a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02d7e4a1
Branch: refs/heads/master
Commit: 02d7e4a102d9dba41b508a033a17fbf2ba37f022
Parents: b8fa8c6
Author: zentol <ch...@apache.org>
Authored: Mon Jan 16 11:25:58 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 23 22:18:19 2017 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/02d7e4a1/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index bc0f005..18f03e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -53,7 +53,6 @@ public class MetricDumpSerializerTest {
SimpleCounter c1 = new SimpleCounter();
SimpleCounter c2 = new SimpleCounter();
- SimpleCounter c3 = new SimpleCounter();
c1.inc(1);
c2.inc(2);
[2/4] flink git commit: [FLINK-5298] TM checks that log file exists
Posted by ch...@apache.org.
[FLINK-5298] TM checks that log file exists
This closes #2974.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8fa8c65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8fa8c65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8fa8c65
Branch: refs/heads/master
Commit: b8fa8c65638e8749f9ad994da8ec69a6c34df029
Parents: da26bdc
Author: zentol <ch...@apache.org>
Authored: Thu Dec 8 19:28:12 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 23 22:18:19 2017 +0100
----------------------------------------------------------------------
.../handlers/TaskManagerLogHandlerTest.java | 129 +++++++++++++++++++
.../flink/runtime/taskmanager/TaskManager.scala | 38 +++---
.../runtime/taskmanager/TaskManagerTest.java | 54 ++++++++
3 files changed, 206 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
new file mode 100644
index 0000000..c1e963e
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class TaskManagerLogHandlerTest {
+ @Test
+ public void testLogFetchingFailure() throws Exception {
+ // ========= setup TaskManager =================================================================================
+ InstanceID tmID = new InstanceID();
+ ResourceID tmRID = new ResourceID(tmID.toString());
+ TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
+ when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
+
+ Instance taskManager = mock(Instance.class);
+ when(taskManager.getId()).thenReturn(tmID);
+ when(taskManager.getTaskManagerID()).thenReturn(tmRID);
+ when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
+ CompletableFuture<BlobKey> future = new FlinkCompletableFuture<>();
+ future.completeExceptionally(new IOException("failure"));
+ when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
+
+ // ========= setup JobManager ==================================================================================
+
+ ActorGateway jobManagerGateway = mock(ActorGateway.class);
+ Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers(
+ JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+
+ when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class)))
+ .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
+ when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()), any(FiniteDuration.class)))
+ .thenReturn(Future$.MODULE$.successful((Object) 5));
+ when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class), any(FiniteDuration.class)))
+ .thenReturn(Future$.MODULE$.successful((Object) new JobManagerMessages.TaskManagerInstance(Option.apply(taskManager))));
+ when(jobManagerGateway.path()).thenReturn("/jm/address");
+
+ JobManagerRetriever retriever = mock(JobManagerRetriever.class);
+ when(retriever.getJobManagerGatewayAndWebPort())
+ .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0)));
+
+
+ TaskManagerLogHandler handler = new TaskManagerLogHandler(
+ retriever,
+ ExecutionContext$.MODULE$.fromExecutor(Executors.directExecutor()),
+ Future$.MODULE$.successful("/jm/address"),
+ AkkaUtils.getDefaultClientTimeout(),
+ TaskManagerLogHandler.FileMode.LOG,
+ new Configuration(),
+ false);
+
+ final AtomicReference<String> exception = new AtomicReference<>();
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.write(isA(ByteBuf.class))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class);
+ exception.set(new String(data.array()));
+ return null;
+ }
+ });
+
+ Map<String, String> pathParams = new HashMap<>();
+ pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString());
+ Routed routed = mock(Routed.class);
+ when(routed.pathParams()).thenReturn(pathParams);
+ when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log"));
+
+ handler.respondAsLeader(ctx, routed, jobManagerGateway);
+
+ Assert.assertEquals("Fetching TaskManager log failed.", exception.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7b0e7d0..3c142cd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -333,7 +333,8 @@ class TaskManager(
case Some(_) =>
handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
case None =>
- sender() ! new IOException("BlobService not available. Cannot upload TaskManager logs.")
+ sender() ! akka.actor.Status.Failure(new IOException("BlobService not " +
+ "available. Cannot upload TaskManager logs."))
}
case RequestBroadcastVariablesWithReferences =>
@@ -823,28 +824,35 @@ class TaskManager(
val logFilePathOption = Option(config.getConfiguration().getString(
ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")))
logFilePathOption match {
- case None => throw new IOException("TaskManager log files are unavailable. " +
+ case None => sender ! akka.actor.Status.Failure(
+ new IOException("TaskManager log files are unavailable. " +
"Log file location not found in environment variable log.file or configuration key "
- + ConfigConstants.TASK_MANAGER_LOG_PATH_KEY + ".");
+ + ConfigConstants.TASK_MANAGER_LOG_PATH_KEY + "."))
case Some(logFilePath) =>
val file: File = requestType match {
case LogFileRequest => new File(logFilePath);
case StdOutFileRequest =>
new File(logFilePath.substring(0, logFilePath.length - 4) + ".out");
}
- val fis = new FileInputStream(file)
- Future {
- val client: BlobClient = blobService.get.createClient()
- client.put(fis)
- }(context.dispatcher)
- .onComplete {
- case Success(value) =>
- sender ! value
- fis.close()
- case Failure(e) =>
- sender ! e
- fis.close()
+ if (file.exists()) {
+ val fis = new FileInputStream(file);
+ Future {
+ val client: BlobClient = blobService.get.createClient()
+ client.put(fis);
}(context.dispatcher)
+ .onComplete {
+ case Success(value) =>
+ sender ! value
+ fis.close()
+ case Failure(e) =>
+ sender ! akka.actor.Status.Failure(e)
+ fis.close()
+ }(context.dispatcher)
+ } else {
+ sender ! akka.actor.Status.Failure(
+ new IOException("TaskManager log files are unavailable. " +
+ "Log file could not be found at " + file.getAbsolutePath + "."))
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 99c1c1f..cc661ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -79,6 +79,7 @@ import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -1075,6 +1076,59 @@ public class TaskManagerTest extends TestLogger {
}};
}
+ @Test
+ public void testLogNotFoundHandling() throws Exception {
+
+ new JavaTestKit(system){{
+
+ // we require a JobManager so that the BlobService is also started
+ ActorGateway jobManager = null;
+ ActorGateway taskManager = null;
+
+ try {
+
+ // Create the JM
+ ActorRef jm = system.actorOf(Props.create(
+ new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+
+ jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+ final int dataPort = NetUtils.getAvailablePort();
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+ config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+ config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+ taskManager = TestingUtils.createTaskManager(
+ system,
+ jobManager,
+ config,
+ false,
+ true);
+
+ // ---------------------------------------------------------------------------------
+
+ final ActorGateway tm = taskManager;
+
+ new Within(d) {
+ @Override
+ protected void run() {
+ Future<Object> logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
+ try {
+ Await.result(logFuture, timeout);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().startsWith("TaskManager log files are unavailable. Log file could not be found at"));
+ }
+ }
+ };
+ } finally {
+ TestingUtils.stopActor(taskManager);
+ TestingUtils.stopActor(jobManager);
+ }
+ }};}
+
// ------------------------------------------------------------------------
// Stack trace sample
// ------------------------------------------------------------------------
[4/4] flink git commit: [FLINK-5464] [metrics] Improve
MetricDumpSerialization exception handling
Posted by ch...@apache.org.
[FLINK-5464] [metrics] Improve MetricDumpSerialization exception handling
This closes #3128.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8e85a2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8e85a2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8e85a2d
Branch: refs/heads/master
Commit: a8e85a2d5abcaf2defab27be0027190ac3ecb5d5
Parents: 7704724
Author: zentol <ch...@apache.org>
Authored: Thu Jan 12 12:42:26 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 23 22:18:20 2017 +0100
----------------------------------------------------------------------
.../webmonitor/metrics/MetricFetcher.java | 6 +-
.../webmonitor/metrics/MetricFetcherTest.java | 6 +-
.../metrics/dump/MetricDumpSerialization.java | 321 ++++++++++++-------
.../metrics/dump/MetricQueryService.java | 2 +-
.../metrics/dump/MetricDumpSerializerTest.java | 47 ++-
.../metrics/dump/MetricQueryServiceTest.java | 23 +-
6 files changed, 264 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 621f4d9..7ffadce 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
@@ -42,7 +43,6 @@ import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -191,8 +191,8 @@ public class MetricFetcher {
logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
}
- private void addMetrics(Object result) throws IOException {
- byte[] data = (byte[]) result;
+ private void addMetrics(Object result) {
+ MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result;
List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
for (MetricDump metric : dumpedMetrics) {
metrics.add(metric);
http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 58048e6..d644c23 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -115,7 +115,7 @@ public class MetricFetcherTest extends TestLogger {
MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
- .thenReturn(Future$.MODULE$.successful((Object) new byte[16]));
+ .thenReturn(Future$.MODULE$.successful((Object) new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
MetricFetcher.BasicGateway tmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
@@ -171,7 +171,7 @@ public class MetricFetcherTest extends TestLogger {
}
}
- private static byte[] createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException {
+ private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException {
Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
@@ -213,7 +213,7 @@ public class MetricFetcherTest extends TestLogger {
histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
- byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
+ MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
serializer.close();
return dump;
http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index 143faaf..e57a0d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -17,20 +17,23 @@
*/
package org.apache.flink.runtime.metrics.dump;
-import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.Meter;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -45,208 +48,287 @@ import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY
* Utility class for the serialization of metrics.
*/
public class MetricDumpSerialization {
+
private static final Logger LOG = LoggerFactory.getLogger(MetricDumpSerialization.class);
private MetricDumpSerialization() {
}
+ /**
+ * This class encapsulates all serialized metrics and a count for each metric type.
+ *
+ * The counts are stored separately from the metrics since the final count for any given type can only be
+ * determined after all metrics of that type were serialized. Storing them together in a single byte[] would
+ * require an additional copy of all serialized metrics, as you would first have to serialize the metrics into a
+ * temporary buffer to calculate the counts, write the counts to the final output and copy all metrics from the
+ * temporary buffer.
+ *
+ * Note that while one could implement the serialization in such a way so that at least 1 byte (a validity flag)
+ * is written for each metric, this would require more bandwidth due to the sheer number of metrics.
+ */
+ public static class MetricSerializationResult implements Serializable {
+
+ private static final long serialVersionUID = 6928770855951536906L;
+
+ public final byte[] serializedMetrics;
+ public final int numCounters;
+ public final int numGauges;
+ public final int numMeters;
+ public final int numHistograms;
+
+ public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) {
+ Preconditions.checkNotNull(serializedMetrics);
+ Preconditions.checkArgument(numCounters >= 0);
+ Preconditions.checkArgument(numGauges >= 0);
+ Preconditions.checkArgument(numMeters >= 0);
+ Preconditions.checkArgument(numHistograms >= 0);
+ this.serializedMetrics = serializedMetrics;
+ this.numCounters = numCounters;
+ this.numGauges = numGauges;
+ this.numMeters = numMeters;
+ this.numHistograms = numHistograms;
+ }
+ }
+
//-------------------------------------------------------------------------
// Serialization
//-------------------------------------------------------------------------
+
public static class MetricDumpSerializer {
- private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- private DataOutputStream dos = new DataOutputStream(baos);
+
+ private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
/**
* Serializes the given metrics and returns the resulting byte array.
+ *
+ * Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned
+ * {@link MetricSerializationResult}.
+ *
+ * If the serialization of any primitive or String fails then the returned {@link MetricSerializationResult}
+ * is partially corrupted. Such a result can be deserialized safely by
+ * {@link MetricDumpDeserializer#deserialize(MetricSerializationResult)}; however only metrics that were
+ * fully serialized before the failure will be returned.
*
* @param counters counters to serialize
* @param gauges gauges to serialize
* @param histograms histograms to serialize
- * @return byte array containing the serialized metrics
- * @throws IOException
+ * @return MetricSerializationResult containing the serialized metrics and the count of each metric type
*/
- public byte[] serialize(
+ public MetricSerializationResult serialize(
Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
- Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
-
- baos.reset();
- dos.writeInt(counters.size());
- dos.writeInt(gauges.size());
- dos.writeInt(histograms.size());
- dos.writeInt(meters.size());
+ Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
+
+ buffer.clear();
+ int numCounters = 0;
for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
- serializeMetricInfo(dos, entry.getValue().f0);
- serializeString(dos, entry.getValue().f1);
- serializeCounter(dos, entry.getKey());
+ try {
+ serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ numCounters++;
+ } catch (Exception e) {
+ LOG.debug("Failed to serialize counter.", e);
+ }
}
+ int numGauges = 0;
for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
- serializeMetricInfo(dos, entry.getValue().f0);
- serializeString(dos, entry.getValue().f1);
- serializeGauge(dos, entry.getKey());
+ try {
+ serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ numGauges++;
+ } catch (Exception e) {
+ LOG.debug("Failed to serialize gauge.", e);
+ }
}
+ int numHistograms = 0;
for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
- serializeMetricInfo(dos, entry.getValue().f0);
- serializeString(dos, entry.getValue().f1);
- serializeHistogram(dos, entry.getKey());
+ try {
+ serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ numHistograms++;
+ } catch (Exception e) {
+ LOG.debug("Failed to serialize histogram.", e);
+ }
}
+ int numMeters = 0;
for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
- serializeMetricInfo(dos, entry.getValue().f0);
- serializeString(dos, entry.getValue().f1);
- serializeMeter(dos, entry.getKey());
+ try {
+ serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ numMeters++;
+ } catch (Exception e) {
+ LOG.debug("Failed to serialize meter.", e);
+ }
}
- return baos.toByteArray();
+
+ return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms);
}
public void close() {
- try {
- dos.close();
- } catch (Exception e) {
- LOG.debug("Failed to close OutputStream.", e);
- }
- try {
- baos.close();
- } catch (Exception e) {
- LOG.debug("Failed to close OutputStream.", e);
- }
+ buffer = null;
}
}
- private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws IOException {
- serializeString(dos, info.scope);
- dos.writeByte(info.getCategory());
+ private static void serializeMetricInfo(DataOutput out, QueryScopeInfo info) throws IOException {
+ out.writeUTF(info.scope);
+ out.writeByte(info.getCategory());
switch (info.getCategory()) {
case INFO_CATEGORY_JM:
break;
case INFO_CATEGORY_TM:
String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
- serializeString(dos, tmID);
+ out.writeUTF(tmID);
break;
case INFO_CATEGORY_JOB:
QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
- serializeString(dos, jobInfo.jobID);
+ out.writeUTF(jobInfo.jobID);
break;
case INFO_CATEGORY_TASK:
QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
- serializeString(dos, taskInfo.jobID);
- serializeString(dos, taskInfo.vertexID);
- dos.writeInt(taskInfo.subtaskIndex);
+ out.writeUTF(taskInfo.jobID);
+ out.writeUTF(taskInfo.vertexID);
+ out.writeInt(taskInfo.subtaskIndex);
break;
case INFO_CATEGORY_OPERATOR:
QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
- serializeString(dos, operatorInfo.jobID);
- serializeString(dos, operatorInfo.vertexID);
- dos.writeInt(operatorInfo.subtaskIndex);
- serializeString(dos, operatorInfo.operatorName);
+ out.writeUTF(operatorInfo.jobID);
+ out.writeUTF(operatorInfo.vertexID);
+ out.writeInt(operatorInfo.subtaskIndex);
+ out.writeUTF(operatorInfo.operatorName);
break;
+ default:
+ throw new IOException("Unknown scope category: " + info.getCategory());
}
}
- private static void serializeString(DataOutputStream dos, String string) throws IOException {
- byte[] bytes = string.getBytes();
- dos.writeInt(bytes.length);
- dos.write(bytes);
+ private static void serializeCounter(DataOutput out, QueryScopeInfo info, String name, Counter counter) throws IOException {
+ long count = counter.getCount();
+ serializeMetricInfo(out, info);
+ out.writeUTF(name);
+ out.writeLong(count);
}
- private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException {
- dos.writeLong(counter.getCount());
- }
+ private static void serializeGauge(DataOutput out, QueryScopeInfo info, String name, Gauge<?> gauge) throws IOException {
+ Object value = gauge.getValue();
+ if (value == null) {
+ throw new NullPointerException("Value returned by gauge " + name + " was null.");
+ }
+ String stringValue = gauge.getValue().toString();
+ if (stringValue == null) {
+ throw new NullPointerException("toString() of the value returned by gauge " + name + " returned null.");
+ }
- private static void serializeGauge(DataOutputStream dos, Gauge<?> gauge) throws IOException {
- serializeString(dos, gauge.getValue().toString());
+ serializeMetricInfo(out, info);
+ out.writeUTF(name);
+ out.writeUTF(stringValue);
}
- private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws IOException {
+ private static void serializeHistogram(DataOutput out, QueryScopeInfo info, String name, Histogram histogram) throws IOException {
HistogramStatistics stat = histogram.getStatistics();
-
- dos.writeLong(stat.getMin());
- dos.writeLong(stat.getMax());
- dos.writeDouble(stat.getMean());
- dos.writeDouble(stat.getQuantile(0.5));
- dos.writeDouble(stat.getStdDev());
- dos.writeDouble(stat.getQuantile(0.75));
- dos.writeDouble(stat.getQuantile(0.90));
- dos.writeDouble(stat.getQuantile(0.95));
- dos.writeDouble(stat.getQuantile(0.98));
- dos.writeDouble(stat.getQuantile(0.99));
- dos.writeDouble(stat.getQuantile(0.999));
+ long min = stat.getMin();
+ long max = stat.getMax();
+ double mean = stat.getMean();
+ double median = stat.getQuantile(0.5);
+ double stddev = stat.getStdDev();
+ double p75 = stat.getQuantile(0.75);
+ double p90 = stat.getQuantile(0.90);
+ double p95 = stat.getQuantile(0.95);
+ double p98 = stat.getQuantile(0.98);
+ double p99 = stat.getQuantile(0.99);
+ double p999 = stat.getQuantile(0.999);
+
+ serializeMetricInfo(out, info);
+ out.writeUTF(name);
+ out.writeLong(min);
+ out.writeLong(max);
+ out.writeDouble(mean);
+ out.writeDouble(median);
+ out.writeDouble(stddev);
+ out.writeDouble(p75);
+ out.writeDouble(p90);
+ out.writeDouble(p95);
+ out.writeDouble(p98);
+ out.writeDouble(p99);
+ out.writeDouble(p999);
}
- private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOException {
- dos.writeDouble(meter.getRate());
+ private static void serializeMeter(DataOutput out, QueryScopeInfo info, String name, Meter meter) throws IOException {
+ serializeMetricInfo(out, info);
+ out.writeUTF(name);
+ out.writeDouble(meter.getRate());
}
//-------------------------------------------------------------------------
// Deserialization
//-------------------------------------------------------------------------
+
public static class MetricDumpDeserializer {
/**
* De-serializes metrics from the given byte array and returns them as a list of {@link MetricDump}.
*
* @param data serialized metrics
* @return A list containing the deserialized metrics.
- * @throws IOException
*/
- public List<MetricDump> deserialize(byte[] data) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- DataInputStream dis = new DataInputStream(bais);
-
- int numCounters = dis.readInt();
- int numGauges = dis.readInt();
- int numHistograms = dis.readInt();
- int numMeters = dis.readInt();
+ public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
+ DataInputView in = new DataInputDeserializer(data.serializedMetrics, 0, data.serializedMetrics.length);
- List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
+ List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
- for (int x = 0; x < numCounters; x++) {
- metrics.add(deserializeCounter(dis));
+ for (int x = 0; x < data.numCounters; x++) {
+ try {
+ metrics.add(deserializeCounter(in));
+ } catch (Exception e) {
+ LOG.debug("Failed to deserialize counter.", e);
+ }
}
- for (int x = 0; x < numGauges; x++) {
- metrics.add(deserializeGauge(dis));
+ for (int x = 0; x < data.numGauges; x++) {
+ try {
+ metrics.add(deserializeGauge(in));
+ } catch (Exception e) {
+ LOG.debug("Failed to deserialize gauge.", e);
+ }
}
- for (int x = 0; x < numHistograms; x++) {
- metrics.add(deserializeHistogram(dis));
+ for (int x = 0; x < data.numHistograms; x++) {
+ try {
+ metrics.add(deserializeHistogram(in));
+ } catch (Exception e) {
+ LOG.debug("Failed to deserialize histogram.", e);
+ }
}
- for (int x = 0; x < numMeters; x++) {
- metrics.add(deserializeMeter(dis));
+ for (int x = 0; x < data.numMeters; x++) {
+ try {
+ metrics.add(deserializeMeter(in));
+ } catch (Exception e) {
+ LOG.debug("Failed to deserialize meter.", e);
+ }
}
return metrics;
}
}
- private static String deserializeString(DataInputStream dis) throws IOException {
- int stringLength = dis.readInt();
- byte[] bytes = new byte[stringLength];
- dis.readFully(bytes);
- return new String(bytes);
- }
- private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException {
+ private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException {
QueryScopeInfo scope = deserializeMetricInfo(dis);
- String name = deserializeString(dis);
- return new MetricDump.CounterDump(scope, name, dis.readLong());
+ String name = dis.readUTF();
+ long count = dis.readLong();
+ return new MetricDump.CounterDump(scope, name, count);
}
- private static MetricDump.GaugeDump deserializeGauge(DataInputStream dis) throws IOException {
+ private static MetricDump.GaugeDump deserializeGauge(DataInputView dis) throws IOException {
QueryScopeInfo scope = deserializeMetricInfo(dis);
- String name = deserializeString(dis);
- String value = deserializeString(dis);
+ String name = dis.readUTF();
+ String value = dis.readUTF();
return new MetricDump.GaugeDump(scope, name, value);
}
- private static MetricDump.HistogramDump deserializeHistogram(DataInputStream dis) throws IOException {
+ private static MetricDump.HistogramDump deserializeHistogram(DataInputView dis) throws IOException {
QueryScopeInfo info = deserializeMetricInfo(dis);
- String name = deserializeString(dis);
+ String name = dis.readUTF();
long min = dis.readLong();
long max = dis.readLong();
double mean = dis.readDouble();
@@ -258,45 +340,46 @@ public class MetricDumpSerialization {
double p98 = dis.readDouble();
double p99 = dis.readDouble();
double p999 = dis.readDouble();
+
return new MetricDump.HistogramDump(info, name, min, max, mean, median, stddev, p75, p90, p95, p98, p99, p999);
}
- private static MetricDump.MeterDump deserializeMeter(DataInputStream dis) throws IOException {
+ private static MetricDump.MeterDump deserializeMeter(DataInputView dis) throws IOException {
QueryScopeInfo info = deserializeMetricInfo(dis);
- String name = deserializeString(dis);
+ String name = dis.readUTF();
double rate = dis.readDouble();
return new MetricDump.MeterDump(info, name, rate);
}
- private static QueryScopeInfo deserializeMetricInfo(DataInputStream dis) throws IOException {
+ private static QueryScopeInfo deserializeMetricInfo(DataInput dis) throws IOException {
String jobID;
String vertexID;
int subtaskIndex;
- String scope = deserializeString(dis);
+ String scope = dis.readUTF();
byte cat = dis.readByte();
switch (cat) {
case INFO_CATEGORY_JM:
return new QueryScopeInfo.JobManagerQueryScopeInfo(scope);
case INFO_CATEGORY_TM:
- String tmID = deserializeString(dis);
+ String tmID = dis.readUTF();
return new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID, scope);
case INFO_CATEGORY_JOB:
- jobID = deserializeString(dis);
+ jobID = dis.readUTF();
return new QueryScopeInfo.JobQueryScopeInfo(jobID, scope);
case INFO_CATEGORY_TASK:
- jobID = deserializeString(dis);
- vertexID = deserializeString(dis);
+ jobID = dis.readUTF();
+ vertexID = dis.readUTF();
subtaskIndex = dis.readInt();
return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope);
case INFO_CATEGORY_OPERATOR:
- jobID = deserializeString(dis);
- vertexID = deserializeString(dis);
+ jobID = dis.readUTF();
+ vertexID = dis.readUTF();
subtaskIndex = dis.readInt();
- String operatorName = deserializeString(dis);
+ String operatorName = dis.readUTF();
return new QueryScopeInfo.OperatorQueryScopeInfo(jobID, vertexID, subtaskIndex, operatorName, scope);
default:
- throw new IOException("sup");
+ throw new IOException("Unknown scope category: " + cat);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 20bc258..2229139 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -106,7 +106,7 @@ public class MetricQueryService extends UntypedActor {
this.meters.remove(metric);
}
} else if (message instanceof CreateDump) {
- byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
+ MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
getSender().tell(dump, getSelf());
} else {
LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 18f03e3..6e3d8f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -24,9 +24,13 @@ import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.junit.Assert;
import org.junit.Test;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,6 +46,47 @@ import static org.junit.Assert.fail;
public class MetricDumpSerializerTest {
@Test
+ public void testNullGaugeHandling() throws IOException {
+ MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
+ MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
+
+ Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
+
+ gauges.put(new Gauge<Object>() {
+ @Override
+ public Object getValue() {
+ return null;
+ }
+ }, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("A"), "g"));
+
+ MetricDumpSerialization.MetricSerializationResult output = serializer.serialize(
+ Collections.<Counter, Tuple2<QueryScopeInfo,String>>emptyMap(),
+ gauges,
+ Collections.<Histogram, Tuple2<QueryScopeInfo, String>>emptyMap(),
+ Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());
+
+ // no metrics should be serialized
+ Assert.assertEquals(0, output.serializedMetrics.length);
+
+ List<MetricDump> deserialized = deserializer.deserialize(output);
+ Assert.assertEquals(0, deserialized.size());
+ }
+
+ @Test
+ public void testJavaSerialization() throws IOException {
+ MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
+
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+ final ObjectOutputStream oos = new ObjectOutputStream(bos);
+
+ oos.writeObject(serializer.serialize(
+ new HashMap<Counter, Tuple2<QueryScopeInfo,String>>(),
+ new HashMap<Gauge<?>, Tuple2<QueryScopeInfo,String>>(),
+ new HashMap<Histogram, Tuple2<QueryScopeInfo,String>>(),
+ new HashMap<Meter, Tuple2<QueryScopeInfo,String>>()));
+ }
+
+ @Test
public void testSerialization() throws IOException {
MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
@@ -92,7 +137,7 @@ public class MetricDumpSerializerTest {
gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));
- byte[] serialized = serializer.serialize(counters, gauges, histograms, meters);
+ MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(counters, gauges, histograms, meters);
List<MetricDump> deserialized = deserializer.deserialize(serialized);
// ===== Counters ==============================================================================================
http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 0104e3e..2243495 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -84,13 +84,6 @@ public class MetricQueryServiceTest extends TestLogger {
MetricQueryService.notifyOfAddedMetric(serviceActor, g, "gauge", tm);
MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm);
MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm);
-
- // these metrics will be removed *after* the first query
- MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
- MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
- MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
- MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
-
serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
synchronized (testActor.lock) {
if (testActor.message == null) {
@@ -98,9 +91,14 @@ public class MetricQueryServiceTest extends TestLogger {
}
}
- byte[] dump = (byte[]) testActor.message;
+ MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
testActor.message = null;
- assertTrue(dump.length > 0);
+ assertTrue(dump.serializedMetrics.length > 0);
+
+ MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
+ MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
+ MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
+ MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
synchronized (testActor.lock) {
@@ -109,12 +107,9 @@ public class MetricQueryServiceTest extends TestLogger {
}
}
- byte[] emptyDump = (byte[]) testActor.message;
+ MetricDumpSerialization.MetricSerializationResult emptyDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
testActor.message = null;
- assertEquals(16, emptyDump.length);
- for (int x = 0; x < 16; x++) {
- assertEquals(0, emptyDump[x]);
- }
+ assertEquals(0, emptyDump.serializedMetrics.length);
s.shutdown();
}
[3/4] flink git commit: [FLINK-5464] [metrics] Ignore metrics that
are null
Posted by ch...@apache.org.
[FLINK-5464] [metrics] Ignore metrics that are null
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77047244
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77047244
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77047244
Branch: refs/heads/master
Commit: 77047244a1ca2456c2b9fdf7083dd3a7c3aba029
Parents: 02d7e4a
Author: zentol <ch...@apache.org>
Authored: Thu Jan 12 12:41:56 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 23 22:18:20 2017 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/metrics/groups/AbstractMetricGroup.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/77047244/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 6ff9776..a19970d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -345,6 +345,10 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
* @param metric the metric to register
*/
protected void addMetric(String name, Metric metric) {
+ if (metric == null) {
+ LOG.warn("Ignoring attempted registration of a metric due to being null for name {}.", name);
+ return;
+ }
// add the metric only if the group is still open
synchronized (this) {
if (!closed) {