You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2017/01/16 12:30:41 UTC

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/3128

    [FLINK-5464] Improve MetricDumpSerialization error handling

    Rework of #3103.
    
    The key change introduced in the previous PR remains; if a gauge returns null it is not serialized.
    
    However, I've extended the PR to harden the entire serialization process against exceptions. The major gain here is that a single failed serialization does no longer destroys the entire dump; instead it is simply omitted.
    
    In order to allow that I had to replace the ```OutputStream```s with a ```ByteBuffer```. The former doesn't really allow you to handle failures in between serialization steps, as you can't reset the stream in any way. The ```ByteBuffer``` is manually resized if a ```BufferOverflowException``` occurs.
    
    * ```MetricDump(De)Serializer#(de)serialize``` will no longer throw any exception but catch and log them instead
    * Exceptions during the serialization of a metric will cause that metric to be skipped.
    * added test for handling of gauge returning null
    * added test for manual resizing of backing array

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 5464_mqs_npe

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3128.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3128
    
----
commit 8610a47c407afe2140cd4b5651ebc794ef3feec8
Author: zentol <ch...@apache.org>
Date:   2017-01-12T11:41:56Z

    [FLINK-5464] [metrics] Ignore metrics that are null

commit 442c0a4dee002b73e5b86d6c7bb274484a8900ac
Author: zentol <ch...@apache.org>
Date:   2017-01-16T10:25:58Z

    [hotfix] Remove unused variable in MetricDumpSerializerTest

commit 0f813ebf53414b1b68c6dfe8e3e1dbc896054c36
Author: zentol <ch...@apache.org>
Date:   2017-01-12T11:42:26Z

    [FLINK-5464] [metrics] Improve MetricDumpSerialization exception handling

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97327512
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
     		 *
     		 * @param data serialized metrics
     		 * @return A list containing the deserialized metrics.
    -		 * @throws IOException
     		 */
    -		public List<MetricDump> deserialize(byte[] data) throws IOException {
    -			ByteArrayInputStream bais = new ByteArrayInputStream(data);
    -			DataInputStream dis = new DataInputStream(bais);
     
    -			int numCounters = dis.readInt();
    -			int numGauges = dis.readInt();
    -			int numHistograms = dis.readInt();
    -			int numMeters = dis.readInt();
    +		public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
    +			DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length);
     
    -			List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
    +			List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
     
    -			for (int x = 0; x < numCounters; x++) {
    -				metrics.add(deserializeCounter(dis));
    +			for (int x = 0; x < data.numCounters; x++) {
    +				try {
    +					metrics.add(deserializeCounter(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numGauges; x++) {
    -				metrics.add(deserializeGauge(dis));
    +			for (int x = 0; x < data.numGauges; x++) {
    +				try {
    +					metrics.add(deserializeGauge(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numHistograms; x++) {
    -				metrics.add(deserializeHistogram(dis));
    +			for (int x = 0; x < data.numHistograms; x++) {
    +				try {
    +					metrics.add(deserializeHistogram(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numMeters; x++) {
    -				metrics.add(deserializeMeter(dis));
    +			for (int x = 0; x < data.numMeters; x++) {
    +				try {
    +					metrics.add(deserializeMeter(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
    -
    -			return metrics;
    --- End diff --
    
    It appears i forgot to push some of the last-minute fixes :/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97343787
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -70,16 +93,26 @@ public MetricSerializationResult(byte[] data, int numCounters, int numGauges, in
     	//-------------------------------------------------------------------------
     	// Serialization
     	//-------------------------------------------------------------------------
    +
     	public static class MetricDumpSerializer {
    +
     		private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
     
     		/**
     		 * Serializes the given metrics and returns the resulting byte array.
    +		 * 
    +		 * Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned
    --- End diff --
    
    Very nice 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97327675
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
     		 *
     		 * @param data serialized metrics
     		 * @return A list containing the deserialized metrics.
    -		 * @throws IOException
     		 */
    -		public List<MetricDump> deserialize(byte[] data) throws IOException {
    -			ByteArrayInputStream bais = new ByteArrayInputStream(data);
    -			DataInputStream dis = new DataInputStream(bais);
     
    -			int numCounters = dis.readInt();
    -			int numGauges = dis.readInt();
    -			int numHistograms = dis.readInt();
    -			int numMeters = dis.readInt();
    +		public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
    +			DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length);
     
    -			List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
    +			List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
     
    -			for (int x = 0; x < numCounters; x++) {
    -				metrics.add(deserializeCounter(dis));
    +			for (int x = 0; x < data.numCounters; x++) {
    +				try {
    +					metrics.add(deserializeCounter(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numGauges; x++) {
    -				metrics.add(deserializeGauge(dis));
    +			for (int x = 0; x < data.numGauges; x++) {
    +				try {
    +					metrics.add(deserializeGauge(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numHistograms; x++) {
    -				metrics.add(deserializeHistogram(dis));
    +			for (int x = 0; x < data.numHistograms; x++) {
    +				try {
    +					metrics.add(deserializeHistogram(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numMeters; x++) {
    -				metrics.add(deserializeMeter(dis));
    +			for (int x = 0; x < data.numMeters; x++) {
    +				try {
    +					metrics.add(deserializeMeter(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
    -
    -			return metrics;
     		}
     	}
     
    -	private static String deserializeString(DataInputStream dis) throws IOException {
    -		int stringLength = dis.readInt();
    -		byte[] bytes = new byte[stringLength];
    -		dis.readFully(bytes);
    -		return new String(bytes);
    -	}
     
    -	private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException {
    +	private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException {
     		QueryScopeInfo scope = deserializeMetricInfo(dis);
    -		String name = deserializeString(dis);
    -		return new MetricDump.CounterDump(scope, name, dis.readLong());
    +		String name = dis.readUTF();
    +		long count = dis.readLong();
    +		return new MetricDump.CounterDump(scope, name, count);
    --- End diff --
    
    Personally for short methods i think it's overkill. I would do it for methods like `deserializaHistogram` though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97316951
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
     		 *
     		 * @param data serialized metrics
     		 * @return A list containing the deserialized metrics.
    -		 * @throws IOException
     		 */
    -		public List<MetricDump> deserialize(byte[] data) throws IOException {
    -			ByteArrayInputStream bais = new ByteArrayInputStream(data);
    -			DataInputStream dis = new DataInputStream(bais);
     
    -			int numCounters = dis.readInt();
    -			int numGauges = dis.readInt();
    -			int numHistograms = dis.readInt();
    -			int numMeters = dis.readInt();
    +		public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
    --- End diff --
    
    Furthermore, after that line. And there is an empty line after the JavaDoc of `deserialize`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97316717
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -50,12 +51,27 @@
     	private MetricDumpSerialization() {
     	}
     
    +	public static class MetricSerializationResult {
    +		public final byte[] data;
    +		public final int numCounters;
    +		public final int numGauges;
    +		public final int numMeters;
    +		public final int numHistograms;
    +		
    +		public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) {
    --- End diff --
    
    Let's decrease the visibility of the constructor as much as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97314773
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -50,12 +51,27 @@
     	private MetricDumpSerialization() {
     	}
     
    +	public static class MetricSerializationResult {
    +		public final byte[] data;
    --- End diff --
    
    Wonderning whether to call this `serializedMetrics`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97315450
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -64,122 +80,148 @@ private MetricDumpSerialization() {
     		 * @param gauges     gauges to serialize
     		 * @param histograms histograms to serialize
     		 * @return byte array containing the serialized metrics
    -		 * @throws IOException
     		 */
    -		public byte[] serialize(
    +		public MetricSerializationResult serialize(
     			Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
     			Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
     			Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
    -			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
    -				
    -			baos.reset();
    -			dos.writeInt(counters.size());
    -			dos.writeInt(gauges.size());
    -			dos.writeInt(histograms.size());
    -			dos.writeInt(meters.size());
    +			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
    +
    +			buffer.clear();
     
    +			int numCounters = 0;
     			for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeCounter(dos, entry.getKey());
    +				try {
    +					serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numCounters++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize counter.", e);
    +				}
     			}
     
    +			int numGauges = 0;
     			for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeGauge(dos, entry.getKey());
    +				try {
    +					serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numGauges++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize gauge.", e);
    +				}
     			}
     
    +			int numHistograms = 0;
     			for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeHistogram(dos, entry.getKey());
    +				try {
    +					serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numHistograms++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize histogram.", e);
    +				}
     			}
     
    +			int numMeters = 0;
     			for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeMeter(dos, entry.getKey());
    +				try {
    +					serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numMeters++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize meter.", e);
    +				}
     			}
    -			return baos.toByteArray();
    +			return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms);
    --- End diff --
    
    Empty line before return?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3128: [FLINK-5464] Improve MetricDumpSerialization error handli...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3128
  
    I've had an offline chat with @rmetzger and @uce. We agreed that using a ByteBuffer and resizing it manually was a bit undesirable.
    
    Instead we opted for the following approach:
    * use DataOutputSerializer instead of DataOutputStream; it is a bit more efficient of strings, which make up the majority of serialized data, and is also backed by a resizing array
    * restructure the serialize methods to be symmetric with the deserialize methods
    * Access the metric values before serializing anything and reduce them to primitives or strings. The assumption is that if this succeeds the following serialization will succeed; and can only fail due to critical errors that will prevent serialization completely or programming errors on our part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97314569
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -17,19 +17,20 @@
      */
     package org.apache.flink.runtime.metrics.dump;
     
    -import org.apache.commons.io.output.ByteArrayOutputStream;
     import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.metrics.Counter;
     import org.apache.flink.metrics.Gauge;
     import org.apache.flink.metrics.Histogram;
     import org.apache.flink.metrics.HistogramStatistics;
     import org.apache.flink.metrics.Meter;
    +import org.apache.flink.runtime.util.DataInputDeserializer;
    +import org.apache.flink.runtime.util.DataOutputSerializer;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.io.ByteArrayInputStream;
    -import java.io.DataInputStream;
    -import java.io.DataOutputStream;
    +import java.io.DataInput;
    +import java.io.DataOutput;
     import java.io.IOException;
     import java.util.ArrayList;
     import java.util.List;
    --- End diff --
    
    In line 49 (above the `LOG` field) an empty line is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97314949
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -50,12 +51,27 @@
     	private MetricDumpSerialization() {
     	}
     
    +	public static class MetricSerializationResult {
    --- End diff --
    
    Can you add a short class level comment about why we added this? The problem with determining number of metrics before hand etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3128: [FLINK-5464] Improve MetricDumpSerialization error handli...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3128
  
    Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97317481
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
     		 *
     		 * @param data serialized metrics
     		 * @return A list containing the deserialized metrics.
    -		 * @throws IOException
     		 */
    -		public List<MetricDump> deserialize(byte[] data) throws IOException {
    -			ByteArrayInputStream bais = new ByteArrayInputStream(data);
    -			DataInputStream dis = new DataInputStream(bais);
     
    -			int numCounters = dis.readInt();
    -			int numGauges = dis.readInt();
    -			int numHistograms = dis.readInt();
    -			int numMeters = dis.readInt();
    +		public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
    +			DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length);
     
    -			List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
    +			List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
     
    -			for (int x = 0; x < numCounters; x++) {
    -				metrics.add(deserializeCounter(dis));
    +			for (int x = 0; x < data.numCounters; x++) {
    +				try {
    +					metrics.add(deserializeCounter(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numGauges; x++) {
    -				metrics.add(deserializeGauge(dis));
    +			for (int x = 0; x < data.numGauges; x++) {
    +				try {
    +					metrics.add(deserializeGauge(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numHistograms; x++) {
    -				metrics.add(deserializeHistogram(dis));
    +			for (int x = 0; x < data.numHistograms; x++) {
    +				try {
    +					metrics.add(deserializeHistogram(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numMeters; x++) {
    -				metrics.add(deserializeMeter(dis));
    +			for (int x = 0; x < data.numMeters; x++) {
    +				try {
    +					metrics.add(deserializeMeter(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
    -
    -			return metrics;
     		}
     	}
     
    -	private static String deserializeString(DataInputStream dis) throws IOException {
    -		int stringLength = dis.readInt();
    -		byte[] bytes = new byte[stringLength];
    -		dis.readFully(bytes);
    -		return new String(bytes);
    -	}
     
    -	private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException {
    +	private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException {
     		QueryScopeInfo scope = deserializeMetricInfo(dis);
    -		String name = deserializeString(dis);
    -		return new MetricDump.CounterDump(scope, name, dis.readLong());
    +		String name = dis.readUTF();
    +		long count = dis.readLong();
    +		return new MetricDump.CounterDump(scope, name, count);
    --- End diff --
    
    Should we add an empty line before the `return`s?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97315979
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -64,122 +80,148 @@ private MetricDumpSerialization() {
     		 * @param gauges     gauges to serialize
     		 * @param histograms histograms to serialize
     		 * @return byte array containing the serialized metrics
    -		 * @throws IOException
     		 */
    -		public byte[] serialize(
    +		public MetricSerializationResult serialize(
     			Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
     			Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
     			Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
    -			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
    -				
    -			baos.reset();
    -			dos.writeInt(counters.size());
    -			dos.writeInt(gauges.size());
    -			dos.writeInt(histograms.size());
    -			dos.writeInt(meters.size());
    +			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
    +
    +			buffer.clear();
     
    +			int numCounters = 0;
     			for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeCounter(dos, entry.getKey());
    +				try {
    +					serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numCounters++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize counter.", e);
    +				}
     			}
     
    +			int numGauges = 0;
     			for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeGauge(dos, entry.getKey());
    +				try {
    +					serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numGauges++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize gauge.", e);
    +				}
     			}
     
    +			int numHistograms = 0;
     			for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeHistogram(dos, entry.getKey());
    +				try {
    +					serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numHistograms++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize histogram.", e);
    +				}
     			}
     
    +			int numMeters = 0;
     			for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeMeter(dos, entry.getKey());
    +				try {
    +					serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numMeters++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize meter.", e);
    +				}
     			}
    -			return baos.toByteArray();
    +			return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms);
     		}
     
     		public void close() {
    -			try {
    -				dos.close();
    -			} catch (Exception e) {
    -				LOG.debug("Failed to close OutputStream.", e);
    -			}
    -			try {
    -				baos.close();
    -			} catch (Exception e) {
    -				LOG.debug("Failed to close OutputStream.", e);
    -			}
    +			buffer = null;
     		}
     	}
     
    -	private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws IOException {
    -		serializeString(dos, info.scope);
    -		dos.writeByte(info.getCategory());
    +	private static void serializeMetricInfo(DataOutput out, QueryScopeInfo info) throws IOException {
    +		out.writeUTF(info.scope);
    +		out.writeByte(info.getCategory());
     		switch (info.getCategory()) {
     			case INFO_CATEGORY_JM:
     				break;
     			case INFO_CATEGORY_TM:
     				String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
    -				serializeString(dos, tmID);
    +				out.writeUTF(tmID);
     				break;
     			case INFO_CATEGORY_JOB:
     				QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
    -				serializeString(dos, jobInfo.jobID);
    +				out.writeUTF(jobInfo.jobID);
     				break;
     			case INFO_CATEGORY_TASK:
     				QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
    -				serializeString(dos, taskInfo.jobID);
    -				serializeString(dos, taskInfo.vertexID);
    -				dos.writeInt(taskInfo.subtaskIndex);
    +				out.writeUTF(taskInfo.jobID);
    +				out.writeUTF(taskInfo.vertexID);
    +				out.writeInt(taskInfo.subtaskIndex);
     				break;
     			case INFO_CATEGORY_OPERATOR:
     				QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
    -				serializeString(dos, operatorInfo.jobID);
    -				serializeString(dos, operatorInfo.vertexID);
    -				dos.writeInt(operatorInfo.subtaskIndex);
    -				serializeString(dos, operatorInfo.operatorName);
    +				out.writeUTF(operatorInfo.jobID);
    +				out.writeUTF(operatorInfo.vertexID);
    +				out.writeInt(operatorInfo.subtaskIndex);
    +				out.writeUTF(operatorInfo.operatorName);
     				break;
    +			default:
    +				throw new IOException("Unknown scope category: " + info.getCategory());
     		}
     	}
     
    -	private static void serializeString(DataOutputStream dos, String string) throws IOException {
    -		byte[] bytes = string.getBytes();
    -		dos.writeInt(bytes.length);
    -		dos.write(bytes);
    +	private static void serializeCounter(DataOutput out, QueryScopeInfo info, String name, Counter counter) throws IOException {
    +		long count = counter.getCount();
    +		serializeMetricInfo(out, info);
    +		out.writeUTF(name);
    +		out.writeLong(count);
     	}
     
    -	private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException {
    -		dos.writeLong(counter.getCount());
    -	}
    -
    -	private static void serializeGauge(DataOutputStream dos, Gauge<?> gauge) throws IOException {
    -		serializeString(dos, gauge.getValue().toString());
    +	private static void serializeGauge(DataOutput out, QueryScopeInfo info, String name, Gauge<?> gauge) throws IOException {
    +		Object value = gauge.getValue();
    +		if (value == null) {
    +			throw new NullPointerException("Value returned by gauge " + name + " was null.");
    +		}
    +		String stringValue = gauge.getValue().toString();
    +		if (stringValue == null) {
    +			throw new NullPointerException("toString() of the value returned by gauge " + name + " returned null.");
    +		}
    +		serializeMetricInfo(out, info);
    +		out.writeUTF(name);
    +		out.writeUTF(stringValue);
     	}
     
    -	private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws IOException {
    +	private static void serializeHistogram(DataOutput out, QueryScopeInfo info, String name, Histogram histogram) throws IOException {
     		HistogramStatistics stat = histogram.getStatistics();
    -
    -		dos.writeLong(stat.getMin());
    -		dos.writeLong(stat.getMax());
    -		dos.writeDouble(stat.getMean());
    -		dos.writeDouble(stat.getQuantile(0.5));
    -		dos.writeDouble(stat.getStdDev());
    -		dos.writeDouble(stat.getQuantile(0.75));
    -		dos.writeDouble(stat.getQuantile(0.90));
    -		dos.writeDouble(stat.getQuantile(0.95));
    -		dos.writeDouble(stat.getQuantile(0.98));
    -		dos.writeDouble(stat.getQuantile(0.99));
    -		dos.writeDouble(stat.getQuantile(0.999));
    +		long min = stat.getMin();
    +		long max = stat.getMax();
    +		double mean = stat.getMean();
    +		double mediam = stat.getQuantile(0.5);
    --- End diff --
    
    Typo `mediam` -> `median`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3128: [FLINK-5464] Improve MetricDumpSerialization error handli...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3128
  
    @uce I've addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97315164
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -50,12 +51,27 @@
     	private MetricDumpSerialization() {
     	}
     
    +	public static class MetricSerializationResult {
    +		public final byte[] data;
    +		public final int numCounters;
    +		public final int numGauges;
    +		public final int numMeters;
    +		public final int numHistograms;
    +		
    +		public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) {
    +			this.data = data;
    --- End diff --
    
    Let's add simple sanity checks `checkNotNull(data)` and `checkArgument(num* >= 0)` for the other fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97319035
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -50,12 +51,27 @@
     	private MetricDumpSerialization() {
     	}
     
    +	public static class MetricSerializationResult {
    --- End diff --
    
    This is not `Serializable` and would fail when send with Akka. The following test fails:
    ```java 
    @Test
    public void testJavaSerialization() throws IOException {
    	MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
    
    	final ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
    	final ObjectOutputStream oos = new ObjectOutputStream(bos);
    
    	oos.writeObject(serializer.serialize(
    		new HashMap<Counter, Tuple2<QueryScopeInfo,String>>(),
    		new HashMap<Gauge<?>, Tuple2<QueryScopeInfo,String>>(),
    		new HashMap<Histogram, Tuple2<QueryScopeInfo,String>>(),
    		new HashMap<Meter, Tuple2<QueryScopeInfo,String>>()));
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97316854
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
     		 *
     		 * @param data serialized metrics
     		 * @return A list containing the deserialized metrics.
    -		 * @throws IOException
     		 */
    -		public List<MetricDump> deserialize(byte[] data) throws IOException {
    -			ByteArrayInputStream bais = new ByteArrayInputStream(data);
    -			DataInputStream dis = new DataInputStream(bais);
     
    -			int numCounters = dis.readInt();
    -			int numGauges = dis.readInt();
    -			int numHistograms = dis.readInt();
    -			int numMeters = dis.readInt();
    +		public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
    --- End diff --
    
    In line 230, an empty line is missing before `class MetricDumpDeserializer`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97316605
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -64,122 +80,148 @@ private MetricDumpSerialization() {
     		 * @param gauges     gauges to serialize
     		 * @param histograms histograms to serialize
     		 * @return byte array containing the serialized metrics
    -		 * @throws IOException
     		 */
    -		public byte[] serialize(
    +		public MetricSerializationResult serialize(
     			Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
     			Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
     			Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
    -			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
    -				
    -			baos.reset();
    -			dos.writeInt(counters.size());
    -			dos.writeInt(gauges.size());
    -			dos.writeInt(histograms.size());
    -			dos.writeInt(meters.size());
    +			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
    +
    +			buffer.clear();
     
    +			int numCounters = 0;
     			for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeCounter(dos, entry.getKey());
    +				try {
    +					serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numCounters++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize counter.", e);
    +				}
     			}
     
    +			int numGauges = 0;
     			for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeGauge(dos, entry.getKey());
    +				try {
    +					serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numGauges++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize gauge.", e);
    +				}
     			}
     
    +			int numHistograms = 0;
     			for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeHistogram(dos, entry.getKey());
    +				try {
    +					serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numHistograms++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize histogram.", e);
    +				}
     			}
     
    +			int numMeters = 0;
     			for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
    -				serializeMetricInfo(dos, entry.getValue().f0);
    -				serializeString(dos, entry.getValue().f1);
    -				serializeMeter(dos, entry.getKey());
    +				try {
    +					serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
    +					numMeters++;
    +				} catch (Exception e) {
    +					LOG.warn("Failed to serialize meter.", e);
    --- End diff --
    
    Should we decrease the log level to `debug` (here and the other lines)? The user won't be able to act on the warning and in most cases this should work and we don't risk printing out many log messages (as happened before a couple of times).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97315205
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -50,12 +51,27 @@
     	private MetricDumpSerialization() {
     	}
     
    +	public static class MetricSerializationResult {
    +		public final byte[] data;
    +		public final int numCounters;
    +		public final int numGauges;
    +		public final int numMeters;
    +		public final int numHistograms;
    +		
    +		public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) {
    +			this.data = data;
    +			this.numCounters = numCounters;
    +			this.numGauges = numGauges;
    +			this.numMeters = numMeters;
    +			this.numHistograms = numHistograms;
    +		}
    +	}
    +
     	//-------------------------------------------------------------------------
     	// Serialization
     	//-------------------------------------------------------------------------
     	public static class MetricDumpSerializer {
    -		private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
    -		private DataOutputStream dos = new DataOutputStream(baos);
    +		private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
    --- End diff --
    
    Empty line missing before this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by zentol <gi...@git.apache.org>.
Github user zentol closed the pull request at:

    https://github.com/apache/flink/pull/3128


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3128#discussion_r97317434
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---
    @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
     		 *
     		 * @param data serialized metrics
     		 * @return A list containing the deserialized metrics.
    -		 * @throws IOException
     		 */
    -		public List<MetricDump> deserialize(byte[] data) throws IOException {
    -			ByteArrayInputStream bais = new ByteArrayInputStream(data);
    -			DataInputStream dis = new DataInputStream(bais);
     
    -			int numCounters = dis.readInt();
    -			int numGauges = dis.readInt();
    -			int numHistograms = dis.readInt();
    -			int numMeters = dis.readInt();
    +		public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
    +			DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length);
     
    -			List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
    +			List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
     
    -			for (int x = 0; x < numCounters; x++) {
    -				metrics.add(deserializeCounter(dis));
    +			for (int x = 0; x < data.numCounters; x++) {
    +				try {
    +					metrics.add(deserializeCounter(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numGauges; x++) {
    -				metrics.add(deserializeGauge(dis));
    +			for (int x = 0; x < data.numGauges; x++) {
    +				try {
    +					metrics.add(deserializeGauge(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numHistograms; x++) {
    -				metrics.add(deserializeHistogram(dis));
    +			for (int x = 0; x < data.numHistograms; x++) {
    +				try {
    +					metrics.add(deserializeHistogram(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
     
    -			for (int x = 0; x < numMeters; x++) {
    -				metrics.add(deserializeMeter(dis));
    +			for (int x = 0; x < data.numMeters; x++) {
    +				try {
    +					metrics.add(deserializeMeter(in));
    +				} catch (Exception e) {
    +					LOG.warn("Failed to deserialize counter.", e);
    +				}
     			}
    -
    -			return metrics;
    --- End diff --
    
    Code does not compile because of the missing return.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---