You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/23 21:18:23 UTC

[1/4] flink git commit: [hotfix] Remove unused variable in MetricDumpSerializerTest

Repository: flink
Updated Branches:
  refs/heads/master da26bdc2e -> a8e85a2d5


[hotfix] Remove unused variable in MetricDumpSerializerTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02d7e4a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02d7e4a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02d7e4a1

Branch: refs/heads/master
Commit: 02d7e4a102d9dba41b508a033a17fbf2ba37f022
Parents: b8fa8c6
Author: zentol <ch...@apache.org>
Authored: Mon Jan 16 11:25:58 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 23 22:18:19 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/02d7e4a1/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index bc0f005..18f03e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -53,7 +53,6 @@ public class MetricDumpSerializerTest {
 
 		SimpleCounter c1 = new SimpleCounter();
 		SimpleCounter c2 = new SimpleCounter();
-		SimpleCounter c3 = new SimpleCounter();
 
 		c1.inc(1);
 		c2.inc(2);


[2/4] flink git commit: [FLINK-5298] TM checks that log file exists

Posted by ch...@apache.org.
[FLINK-5298] TM checks that log file exists

This closes #2974.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8fa8c65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8fa8c65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8fa8c65

Branch: refs/heads/master
Commit: b8fa8c65638e8749f9ad994da8ec69a6c34df029
Parents: da26bdc
Author: zentol <ch...@apache.org>
Authored: Thu Dec 8 19:28:12 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 23 22:18:19 2017 +0100

----------------------------------------------------------------------
 .../handlers/TaskManagerLogHandlerTest.java     | 129 +++++++++++++++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  38 +++---
 .../runtime/taskmanager/TaskManagerTest.java    |  54 ++++++++
 3 files changed, 206 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
new file mode 100644
index 0000000..c1e963e
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class TaskManagerLogHandlerTest {
+	@Test
+	public void testLogFetchingFailure() throws Exception {
+		// ========= setup TaskManager =================================================================================
+		InstanceID tmID = new InstanceID();
+		ResourceID tmRID = new ResourceID(tmID.toString());
+		TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
+		when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
+
+		Instance taskManager = mock(Instance.class);
+		when(taskManager.getId()).thenReturn(tmID);
+		when(taskManager.getTaskManagerID()).thenReturn(tmRID);
+		when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
+		CompletableFuture<BlobKey> future = new FlinkCompletableFuture<>();
+		future.completeExceptionally(new IOException("failure"));
+		when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
+
+		// ========= setup JobManager ==================================================================================
+
+		ActorGateway jobManagerGateway = mock(ActorGateway.class);
+		Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers(
+			JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+
+		when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class)))
+			.thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
+		when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()), any(FiniteDuration.class)))
+			.thenReturn(Future$.MODULE$.successful((Object) 5));
+		when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class), any(FiniteDuration.class)))
+			.thenReturn(Future$.MODULE$.successful((Object) new JobManagerMessages.TaskManagerInstance(Option.apply(taskManager))));
+		when(jobManagerGateway.path()).thenReturn("/jm/address");
+
+		JobManagerRetriever retriever = mock(JobManagerRetriever.class);
+		when(retriever.getJobManagerGatewayAndWebPort())
+			.thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0)));
+
+
+		TaskManagerLogHandler handler = new TaskManagerLogHandler(
+			retriever,
+			ExecutionContext$.MODULE$.fromExecutor(Executors.directExecutor()),
+			Future$.MODULE$.successful("/jm/address"),
+			AkkaUtils.getDefaultClientTimeout(),
+			TaskManagerLogHandler.FileMode.LOG,
+			new Configuration(),
+			false);
+
+		final AtomicReference<String> exception = new AtomicReference<>();
+		
+		ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+		when(ctx.write(isA(ByteBuf.class))).thenAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class);
+				exception.set(new String(data.array()));
+				return null;
+			}
+		});
+
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString());
+		Routed routed = mock(Routed.class);
+		when(routed.pathParams()).thenReturn(pathParams);
+		when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log"));
+
+		handler.respondAsLeader(ctx, routed, jobManagerGateway);
+
+		Assert.assertEquals("Fetching TaskManager log failed.", exception.get());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7b0e7d0..3c142cd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -333,7 +333,8 @@ class TaskManager(
         case Some(_) =>
           handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
         case None =>
-          sender() ! new IOException("BlobService not available. Cannot upload TaskManager logs.")
+          sender() ! akka.actor.Status.Failure(new IOException("BlobService not " +
+            "available. Cannot upload TaskManager logs."))
       }
 
     case RequestBroadcastVariablesWithReferences =>
@@ -823,28 +824,35 @@ class TaskManager(
     val logFilePathOption = Option(config.getConfiguration().getString(
       ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")))
     logFilePathOption match {
-      case None => throw new IOException("TaskManager log files are unavailable. " +
+      case None => sender ! akka.actor.Status.Failure(
+        new IOException("TaskManager log files are unavailable. " +
         "Log file location not found in environment variable log.file or configuration key "
-        + ConfigConstants.TASK_MANAGER_LOG_PATH_KEY + ".");
+        + ConfigConstants.TASK_MANAGER_LOG_PATH_KEY + "."))
       case Some(logFilePath) =>
         val file: File = requestType match {
           case LogFileRequest => new File(logFilePath);
           case StdOutFileRequest =>
             new File(logFilePath.substring(0, logFilePath.length - 4) + ".out");
         }
-        val fis = new FileInputStream(file)
-        Future {
-          val client: BlobClient = blobService.get.createClient()
-          client.put(fis)
-        }(context.dispatcher)
-          .onComplete {
-            case Success(value) => 
-              sender ! value
-              fis.close()
-            case Failure(e) =>
-              sender ! e
-              fis.close()
+        if (file.exists()) {
+          val fis = new FileInputStream(file);
+          Future {
+            val client: BlobClient = blobService.get.createClient()
+            client.put(fis);
           }(context.dispatcher)
+            .onComplete {
+              case Success(value) =>
+                sender ! value
+                fis.close()
+              case Failure(e) =>
+                sender ! akka.actor.Status.Failure(e)
+                fis.close()
+            }(context.dispatcher)
+        } else {
+          sender ! akka.actor.Status.Failure(
+            new IOException("TaskManager log files are unavailable. " +
+            "Log file could not be found at " + file.getAbsolutePath + "."))
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8fa8c65/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 99c1c1f..cc661ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -79,6 +79,7 @@ import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -1075,6 +1076,59 @@ public class TaskManagerTest extends TestLogger {
 		}};
 	}
 
+	@Test
+	public void testLogNotFoundHandling() throws Exception {
+
+		new JavaTestKit(system){{
+
+			// we require a JobManager so that the BlobService is also started
+			ActorGateway jobManager = null;
+			ActorGateway taskManager = null;
+
+			try {
+
+				// Create the JM
+				ActorRef jm = system.actorOf(Props.create(
+					new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+
+				jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+				final int dataPort = NetUtils.getAvailablePort();
+				Configuration config = new Configuration();
+				config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+				config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+				taskManager = TestingUtils.createTaskManager(
+					system,
+					jobManager,
+					config,
+					false,
+					true);
+
+				// ---------------------------------------------------------------------------------
+
+				final ActorGateway tm = taskManager;
+
+				new Within(d) {
+					@Override
+					protected void run() {
+						Future<Object> logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
+						try {
+							Await.result(logFuture, timeout);
+							Assert.fail();
+						} catch (Exception e) {
+							Assert.assertTrue(e.getMessage().startsWith("TaskManager log files are unavailable. Log file could not be found at"));
+						}
+					}
+				};
+			} finally {
+				TestingUtils.stopActor(taskManager);
+				TestingUtils.stopActor(jobManager);
+			}
+		}};}
+
 	// ------------------------------------------------------------------------
 	// Stack trace sample
 	// ------------------------------------------------------------------------


[4/4] flink git commit: [FLINK-5464] [metrics] Improve MetricDumpSerialization exception handling

Posted by ch...@apache.org.
[FLINK-5464] [metrics] Improve MetricDumpSerialization exception handling

This closes #3128.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8e85a2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8e85a2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8e85a2d

Branch: refs/heads/master
Commit: a8e85a2d5abcaf2defab27be0027190ac3ecb5d5
Parents: 7704724
Author: zentol <ch...@apache.org>
Authored: Thu Jan 12 12:42:26 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 23 22:18:20 2017 +0100

----------------------------------------------------------------------
 .../webmonitor/metrics/MetricFetcher.java       |   6 +-
 .../webmonitor/metrics/MetricFetcherTest.java   |   6 +-
 .../metrics/dump/MetricDumpSerialization.java   | 321 ++++++++++++-------
 .../metrics/dump/MetricQueryService.java        |   2 +-
 .../metrics/dump/MetricDumpSerializerTest.java  |  47 ++-
 .../metrics/dump/MetricQueryServiceTest.java    |  23 +-
 6 files changed, 264 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 621f4d9..7ffadce 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
@@ -42,7 +43,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -191,8 +191,8 @@ public class MetricFetcher {
 		logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
 	}
 
-	private void addMetrics(Object result) throws IOException {
-		byte[] data = (byte[]) result;
+	private void addMetrics(Object result) {
+		MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result;
 		List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
 		for (MetricDump metric : dumpedMetrics) {
 			metrics.add(metric);

http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 58048e6..d644c23 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -115,7 +115,7 @@ public class MetricFetcherTest extends TestLogger {
 
 		MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
 		when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
-			.thenReturn(Future$.MODULE$.successful((Object) new byte[16]));
+			.thenReturn(Future$.MODULE$.successful((Object) new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
 
 		MetricFetcher.BasicGateway tmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
 		when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
@@ -171,7 +171,7 @@ public class MetricFetcherTest extends TestLogger {
 		}
 	}
 
-	private static byte[] createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException {
+	private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException {
 		Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
 		Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
 		Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
@@ -213,7 +213,7 @@ public class MetricFetcherTest extends TestLogger {
 		histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
 
 		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
-		byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
+		MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
 		serializer.close();
 
 		return dump;

http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index 143faaf..e57a0d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -17,20 +17,23 @@
  */
 package org.apache.flink.runtime.metrics.dump;
 
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Meter;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -45,208 +48,287 @@ import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY
  * Utility class for the serialization of metrics.
  */
 public class MetricDumpSerialization {
+
 	private static final Logger LOG = LoggerFactory.getLogger(MetricDumpSerialization.class);
 
 	private MetricDumpSerialization() {
 	}
 
+	/**
+	 * This class encapsulates all serialized metrics and a count for each metric type.
+	 * 
+	 * The counts are stored separately from the metrics since the final count for any given type can only be
+	 * determined after all metrics of that type were serialized. Storing them together in a single byte[] would
+	 * require an additional copy of all serialized metrics, as you would first have to serialize the metrics into a
+	 * temporary buffer to calculate the counts, write the counts to the final output and copy all metrics from the
+	 * temporary buffer.
+	 * 
+	 * Note that while one could implement the serialization in such a way so that at least 1 byte (a validity flag)
+	 * is written for each metric, this would require more bandwidth due to the sheer number of metrics.
+	 */
+	public static class MetricSerializationResult implements Serializable {
+
+		private static final long serialVersionUID = 6928770855951536906L;
+
+		public final byte[] serializedMetrics;
+		public final int numCounters;
+		public final int numGauges;
+		public final int numMeters;
+		public final int numHistograms;
+		
+		public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) {
+			Preconditions.checkNotNull(serializedMetrics);
+			Preconditions.checkArgument(numCounters >= 0);
+			Preconditions.checkArgument(numGauges >= 0);
+			Preconditions.checkArgument(numMeters >= 0); 
+			Preconditions.checkArgument(numHistograms >= 0);
+			this.serializedMetrics = serializedMetrics;
+			this.numCounters = numCounters;
+			this.numGauges = numGauges;
+			this.numMeters = numMeters;
+			this.numHistograms = numHistograms;
+		}
+	}
+
 	//-------------------------------------------------------------------------
 	// Serialization
 	//-------------------------------------------------------------------------
+
 	public static class MetricDumpSerializer {
-		private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-		private DataOutputStream dos = new DataOutputStream(baos);
+
+		private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
 
 		/**
 		 * Serializes the given metrics and returns the resulting byte array.
+		 * 
+		 * Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned
+		 * {@link MetricSerializationResult}.
+		 * 
+		 * If the serialization of any primitive or String fails then the returned {@link MetricSerializationResult}
+		 * is partially corrupted. Such a result can be deserialized safely by 
+		 * {@link MetricDumpDeserializer#deserialize(MetricSerializationResult)}; however only metrics that were
+		 * fully serialized before the failure will be returned.
 		 *
 		 * @param counters   counters to serialize
 		 * @param gauges     gauges to serialize
 		 * @param histograms histograms to serialize
-		 * @return byte array containing the serialized metrics
-		 * @throws IOException
+		 * @return MetricSerializationResult containing the serialized metrics and the count of each metric type
 		 */
-		public byte[] serialize(
+		public MetricSerializationResult serialize(
 			Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
 			Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
 			Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
-			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
-				
-			baos.reset();
-			dos.writeInt(counters.size());
-			dos.writeInt(gauges.size());
-			dos.writeInt(histograms.size());
-			dos.writeInt(meters.size());
+			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
+
+			buffer.clear();
 
+			int numCounters = 0;
 			for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
-				serializeMetricInfo(dos, entry.getValue().f0);
-				serializeString(dos, entry.getValue().f1);
-				serializeCounter(dos, entry.getKey());
+				try {
+					serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					numCounters++;
+				} catch (Exception e) {
+					LOG.debug("Failed to serialize counter.", e);
+				}
 			}
 
+			int numGauges = 0;
 			for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
-				serializeMetricInfo(dos, entry.getValue().f0);
-				serializeString(dos, entry.getValue().f1);
-				serializeGauge(dos, entry.getKey());
+				try {
+					serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					numGauges++;
+				} catch (Exception e) {
+					LOG.debug("Failed to serialize gauge.", e);
+				}
 			}
 
+			int numHistograms = 0;
 			for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
-				serializeMetricInfo(dos, entry.getValue().f0);
-				serializeString(dos, entry.getValue().f1);
-				serializeHistogram(dos, entry.getKey());
+				try {
+					serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					numHistograms++;
+				} catch (Exception e) {
+					LOG.debug("Failed to serialize histogram.", e);
+				}
 			}
 
+			int numMeters = 0;
 			for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
-				serializeMetricInfo(dos, entry.getValue().f0);
-				serializeString(dos, entry.getValue().f1);
-				serializeMeter(dos, entry.getKey());
+				try {
+					serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
+					numMeters++;
+				} catch (Exception e) {
+					LOG.debug("Failed to serialize meter.", e);
+				}
 			}
-			return baos.toByteArray();
+
+			return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms);
 		}
 
 		public void close() {
-			try {
-				dos.close();
-			} catch (Exception e) {
-				LOG.debug("Failed to close OutputStream.", e);
-			}
-			try {
-				baos.close();
-			} catch (Exception e) {
-				LOG.debug("Failed to close OutputStream.", e);
-			}
+			buffer = null;
 		}
 	}
 
-	private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws IOException {
-		serializeString(dos, info.scope);
-		dos.writeByte(info.getCategory());
+	private static void serializeMetricInfo(DataOutput out, QueryScopeInfo info) throws IOException {
+		out.writeUTF(info.scope);
+		out.writeByte(info.getCategory());
 		switch (info.getCategory()) {
 			case INFO_CATEGORY_JM:
 				break;
 			case INFO_CATEGORY_TM:
 				String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
-				serializeString(dos, tmID);
+				out.writeUTF(tmID);
 				break;
 			case INFO_CATEGORY_JOB:
 				QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
-				serializeString(dos, jobInfo.jobID);
+				out.writeUTF(jobInfo.jobID);
 				break;
 			case INFO_CATEGORY_TASK:
 				QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
-				serializeString(dos, taskInfo.jobID);
-				serializeString(dos, taskInfo.vertexID);
-				dos.writeInt(taskInfo.subtaskIndex);
+				out.writeUTF(taskInfo.jobID);
+				out.writeUTF(taskInfo.vertexID);
+				out.writeInt(taskInfo.subtaskIndex);
 				break;
 			case INFO_CATEGORY_OPERATOR:
 				QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
-				serializeString(dos, operatorInfo.jobID);
-				serializeString(dos, operatorInfo.vertexID);
-				dos.writeInt(operatorInfo.subtaskIndex);
-				serializeString(dos, operatorInfo.operatorName);
+				out.writeUTF(operatorInfo.jobID);
+				out.writeUTF(operatorInfo.vertexID);
+				out.writeInt(operatorInfo.subtaskIndex);
+				out.writeUTF(operatorInfo.operatorName);
 				break;
+			default:
+				throw new IOException("Unknown scope category: " + info.getCategory());
 		}
 	}
 
-	private static void serializeString(DataOutputStream dos, String string) throws IOException {
-		byte[] bytes = string.getBytes();
-		dos.writeInt(bytes.length);
-		dos.write(bytes);
+	private static void serializeCounter(DataOutput out, QueryScopeInfo info, String name, Counter counter) throws IOException {
+		long count = counter.getCount();
+		serializeMetricInfo(out, info);
+		out.writeUTF(name);
+		out.writeLong(count);
 	}
 
-	private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException {
-		dos.writeLong(counter.getCount());
-	}
+	private static void serializeGauge(DataOutput out, QueryScopeInfo info, String name, Gauge<?> gauge) throws IOException {
+		Object value = gauge.getValue();
+		if (value == null) {
+			throw new NullPointerException("Value returned by gauge " + name + " was null.");
+		}
+		String stringValue = gauge.getValue().toString();
+		if (stringValue == null) {
+			throw new NullPointerException("toString() of the value returned by gauge " + name + " returned null.");
+		}
 
-	private static void serializeGauge(DataOutputStream dos, Gauge<?> gauge) throws IOException {
-		serializeString(dos, gauge.getValue().toString());
+		serializeMetricInfo(out, info);
+		out.writeUTF(name);
+		out.writeUTF(stringValue);
 	}
 
-	private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws IOException {
+	private static void serializeHistogram(DataOutput out, QueryScopeInfo info, String name, Histogram histogram) throws IOException {
 		HistogramStatistics stat = histogram.getStatistics();
-
-		dos.writeLong(stat.getMin());
-		dos.writeLong(stat.getMax());
-		dos.writeDouble(stat.getMean());
-		dos.writeDouble(stat.getQuantile(0.5));
-		dos.writeDouble(stat.getStdDev());
-		dos.writeDouble(stat.getQuantile(0.75));
-		dos.writeDouble(stat.getQuantile(0.90));
-		dos.writeDouble(stat.getQuantile(0.95));
-		dos.writeDouble(stat.getQuantile(0.98));
-		dos.writeDouble(stat.getQuantile(0.99));
-		dos.writeDouble(stat.getQuantile(0.999));
+		long min = stat.getMin();
+		long max = stat.getMax();
+		double mean = stat.getMean();
+		double median = stat.getQuantile(0.5);
+		double stddev = stat.getStdDev();
+		double p75 = stat.getQuantile(0.75);
+		double p90 = stat.getQuantile(0.90);
+		double p95 = stat.getQuantile(0.95);
+		double p98 = stat.getQuantile(0.98);
+		double p99 = stat.getQuantile(0.99);
+		double p999 = stat.getQuantile(0.999);
+
+		serializeMetricInfo(out, info);
+		out.writeUTF(name);
+		out.writeLong(min);
+		out.writeLong(max);
+		out.writeDouble(mean);
+		out.writeDouble(median);
+		out.writeDouble(stddev);
+		out.writeDouble(p75);
+		out.writeDouble(p90);
+		out.writeDouble(p95);
+		out.writeDouble(p98);
+		out.writeDouble(p99);
+		out.writeDouble(p999);
 	}
 
-	private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOException {
-		dos.writeDouble(meter.getRate());
+	private static void serializeMeter(DataOutput out, QueryScopeInfo info, String name, Meter meter) throws IOException {
+		serializeMetricInfo(out, info);
+		out.writeUTF(name);
+		out.writeDouble(meter.getRate());
 	}
 
 	//-------------------------------------------------------------------------
 	// Deserialization
 	//-------------------------------------------------------------------------
+
 	public static class MetricDumpDeserializer {
 		/**
 		 * De-serializes metrics from the given byte array and returns them as a list of {@link MetricDump}.
 		 *
 		 * @param data serialized metrics
 		 * @return A list containing the deserialized metrics.
-		 * @throws IOException
 		 */
-		public List<MetricDump> deserialize(byte[] data) throws IOException {
-			ByteArrayInputStream bais = new ByteArrayInputStream(data);
-			DataInputStream dis = new DataInputStream(bais);
-
-			int numCounters = dis.readInt();
-			int numGauges = dis.readInt();
-			int numHistograms = dis.readInt();
-			int numMeters = dis.readInt();
+		public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
+			DataInputView in = new DataInputDeserializer(data.serializedMetrics, 0, data.serializedMetrics.length);
 
-			List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
+			List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
 
-			for (int x = 0; x < numCounters; x++) {
-				metrics.add(deserializeCounter(dis));
+			for (int x = 0; x < data.numCounters; x++) {
+				try {
+					metrics.add(deserializeCounter(in));
+				} catch (Exception e) {
+					LOG.debug("Failed to deserialize counter.", e);
+				}
 			}
 
-			for (int x = 0; x < numGauges; x++) {
-				metrics.add(deserializeGauge(dis));
+			for (int x = 0; x < data.numGauges; x++) {
+				try {
+					metrics.add(deserializeGauge(in));
+				} catch (Exception e) {
+					LOG.debug("Failed to deserialize gauge.", e);
+				}
 			}
 
-			for (int x = 0; x < numHistograms; x++) {
-				metrics.add(deserializeHistogram(dis));
+			for (int x = 0; x < data.numHistograms; x++) {
+				try {
+					metrics.add(deserializeHistogram(in));
+				} catch (Exception e) {
+					LOG.debug("Failed to deserialize histogram.", e);
+				}
 			}
 
-			for (int x = 0; x < numMeters; x++) {
-				metrics.add(deserializeMeter(dis));
+			for (int x = 0; x < data.numMeters; x++) {
+				try {
+					metrics.add(deserializeMeter(in));
+				} catch (Exception e) {
+					LOG.debug("Failed to deserialize meter.", e);
+				}
 			}
 
 			return metrics;
 		}
 	}
 
-	private static String deserializeString(DataInputStream dis) throws IOException {
-		int stringLength = dis.readInt();
-		byte[] bytes = new byte[stringLength];
-		dis.readFully(bytes);
-		return new String(bytes);
-	}
 
-	private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException {
+	private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException {
 		QueryScopeInfo scope = deserializeMetricInfo(dis);
-		String name = deserializeString(dis);
-		return new MetricDump.CounterDump(scope, name, dis.readLong());
+		String name = dis.readUTF();
+		long count = dis.readLong();
+		return new MetricDump.CounterDump(scope, name, count);
 	}
 
-	private static MetricDump.GaugeDump deserializeGauge(DataInputStream dis) throws IOException {
+	private static MetricDump.GaugeDump deserializeGauge(DataInputView dis) throws IOException {
 		QueryScopeInfo scope = deserializeMetricInfo(dis);
-		String name = deserializeString(dis);
-		String value = deserializeString(dis);
+		String name = dis.readUTF();
+		String value = dis.readUTF();
 		return new MetricDump.GaugeDump(scope, name, value);
 	}
 
-	private static MetricDump.HistogramDump deserializeHistogram(DataInputStream dis) throws IOException {
+	private static MetricDump.HistogramDump deserializeHistogram(DataInputView dis) throws IOException {
 		QueryScopeInfo info = deserializeMetricInfo(dis);
-		String name = deserializeString(dis);
+		String name = dis.readUTF();
 		long min = dis.readLong();
 		long max = dis.readLong();
 		double mean = dis.readDouble();
@@ -258,45 +340,46 @@ public class MetricDumpSerialization {
 		double p98 = dis.readDouble();
 		double p99 = dis.readDouble();
 		double p999 = dis.readDouble();
+
 		return new MetricDump.HistogramDump(info, name, min, max, mean, median, stddev, p75, p90, p95, p98, p99, p999);
 	}
 
-	private static MetricDump.MeterDump deserializeMeter(DataInputStream dis) throws IOException {
+	private static MetricDump.MeterDump deserializeMeter(DataInputView dis) throws IOException {
 		QueryScopeInfo info = deserializeMetricInfo(dis);
-		String name = deserializeString(dis);
+		String name = dis.readUTF();
 		double rate = dis.readDouble();
 		return new MetricDump.MeterDump(info, name, rate);
 	}
 
-	private static QueryScopeInfo deserializeMetricInfo(DataInputStream dis) throws IOException {
+	private static QueryScopeInfo deserializeMetricInfo(DataInput dis) throws IOException {
 		String jobID;
 		String vertexID;
 		int subtaskIndex;
 
-		String scope = deserializeString(dis);
+		String scope = dis.readUTF();
 		byte cat = dis.readByte();
 		switch (cat) {
 			case INFO_CATEGORY_JM:
 				return new QueryScopeInfo.JobManagerQueryScopeInfo(scope);
 			case INFO_CATEGORY_TM:
-				String tmID = deserializeString(dis);
+				String tmID = dis.readUTF();
 				return new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID, scope);
 			case INFO_CATEGORY_JOB:
-				jobID = deserializeString(dis);
+				jobID = dis.readUTF();
 				return new QueryScopeInfo.JobQueryScopeInfo(jobID, scope);
 			case INFO_CATEGORY_TASK:
-				jobID = deserializeString(dis);
-				vertexID = deserializeString(dis);
+				jobID = dis.readUTF();
+				vertexID = dis.readUTF();
 				subtaskIndex = dis.readInt();
 				return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope);
 			case INFO_CATEGORY_OPERATOR:
-				jobID = deserializeString(dis);
-				vertexID = deserializeString(dis);
+				jobID = dis.readUTF();
+				vertexID = dis.readUTF();
 				subtaskIndex = dis.readInt();
-				String operatorName = deserializeString(dis);
+				String operatorName = dis.readUTF();
 				return new QueryScopeInfo.OperatorQueryScopeInfo(jobID, vertexID, subtaskIndex, operatorName, scope);
 			default:
-				throw new IOException("sup");
+				throw new IOException("Unknown scope category: " + cat);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 20bc258..2229139 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -106,7 +106,7 @@ public class MetricQueryService extends UntypedActor {
 					this.meters.remove(metric);
 				}
 			} else if (message instanceof CreateDump) {
-				byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
+				MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
 				getSender().tell(dump, getSelf());
 			} else {
 				LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 18f03e3..6e3d8f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -24,9 +24,13 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +46,47 @@ import static org.junit.Assert.fail;
 
 public class MetricDumpSerializerTest {
 	@Test
+	public void testNullGaugeHandling() throws IOException {
+		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
+		MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
+
+		Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
+		
+		gauges.put(new Gauge<Object>() {
+			@Override
+			public Object getValue() {
+				return null;
+			}
+		}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("A"), "g"));
+		
+		MetricDumpSerialization.MetricSerializationResult output = serializer.serialize(
+			Collections.<Counter, Tuple2<QueryScopeInfo,String>>emptyMap(),
+			gauges,
+			Collections.<Histogram, Tuple2<QueryScopeInfo, String>>emptyMap(),
+			Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());
+		
+		// no metrics should be serialized
+		Assert.assertEquals(0, output.serializedMetrics.length);
+
+		List<MetricDump> deserialized = deserializer.deserialize(output);
+		Assert.assertEquals(0, deserialized.size());
+	}
+
+	@Test
+	public void testJavaSerialization() throws IOException {
+		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
+
+		final ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+		final ObjectOutputStream oos = new ObjectOutputStream(bos);
+
+		oos.writeObject(serializer.serialize(
+			new HashMap<Counter, Tuple2<QueryScopeInfo,String>>(),
+			new HashMap<Gauge<?>, Tuple2<QueryScopeInfo,String>>(),
+			new HashMap<Histogram, Tuple2<QueryScopeInfo,String>>(),
+			new HashMap<Meter, Tuple2<QueryScopeInfo,String>>()));
+	}
+
+	@Test
 	public void testSerialization() throws IOException {
 		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
 		MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
@@ -92,7 +137,7 @@ public class MetricDumpSerializerTest {
 		gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
 		histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));
 
-		byte[] serialized = serializer.serialize(counters, gauges, histograms, meters);
+		MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(counters, gauges, histograms, meters);
 		List<MetricDump> deserialized = deserializer.deserialize(serialized);
 
 		// ===== Counters ==============================================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/a8e85a2d/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 0104e3e..2243495 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -84,13 +84,6 @@ public class MetricQueryServiceTest extends TestLogger {
 		MetricQueryService.notifyOfAddedMetric(serviceActor, g, "gauge", tm);
 		MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm);
 		MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm);
-
-		// these metrics will be removed *after* the first query
-		MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
-		MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
-		MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
-		MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
-
 		serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
 		synchronized (testActor.lock) {
 			if (testActor.message == null) {
@@ -98,9 +91,14 @@ public class MetricQueryServiceTest extends TestLogger {
 			}
 		}
 
-		byte[] dump = (byte[]) testActor.message;
+		MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
 		testActor.message = null;
-		assertTrue(dump.length > 0);
+		assertTrue(dump.serializedMetrics.length > 0);
+
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
+		MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
 
 		serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
 		synchronized (testActor.lock) {
@@ -109,12 +107,9 @@ public class MetricQueryServiceTest extends TestLogger {
 			}
 		}
 
-		byte[] emptyDump = (byte[]) testActor.message;
+		MetricDumpSerialization.MetricSerializationResult emptyDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
 		testActor.message = null;
-		assertEquals(16, emptyDump.length);
-		for (int x = 0; x < 16; x++) {
-			assertEquals(0, emptyDump[x]);
-		}
+		assertEquals(0, emptyDump.serializedMetrics.length);
 
 		s.shutdown();
 	}


[3/4] flink git commit: [FLINK-5464] [metrics] Ignore metrics that are null

Posted by ch...@apache.org.
[FLINK-5464] [metrics] Ignore metrics that are null


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77047244
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77047244
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77047244

Branch: refs/heads/master
Commit: 77047244a1ca2456c2b9fdf7083dd3a7c3aba029
Parents: 02d7e4a
Author: zentol <ch...@apache.org>
Authored: Thu Jan 12 12:41:56 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Jan 23 22:18:20 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/metrics/groups/AbstractMetricGroup.java | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/77047244/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 6ff9776..a19970d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -345,6 +345,10 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	 * @param metric the metric to register
 	 */
 	protected void addMetric(String name, Metric metric) {
+		if (metric == null) {
+			LOG.warn("Ignoring attempted registration of a metric due to being null for name {}.", name);
+			return;
+		}
 		// add the metric only if the group is still open
 		synchronized (this) {
 			if (!closed) {