You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by rmetzger <gi...@git.apache.org> on 2016/07/06 10:29:46 UTC

[GitHub] flink pull request #2206: [FLINK-4157] Catch Kafka metrics serialization exc...

GitHub user rmetzger opened a pull request:

    https://github.com/apache/flink/pull/2206

    [FLINK-4157] Catch Kafka metrics serialization exceptions

    The metrics are an optional feature, so they should not cause exceptions or failures for the system.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rmetzger/flink flink4157

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2206.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2206
    
----
commit 8aed84f948ece3ed098bd3e66eacf995d701aa82
Author: Robert Metzger <rm...@apache.org>
Date:   2016-07-06T10:28:55Z

    [FLINK-4157] Catch Kafka metrics serialization exceptions

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2206: [FLINK-4157] Catch Kafka metrics serialization exc...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2206#discussion_r69717154
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java ---
    @@ -127,12 +127,17 @@ private static AvgSumCount getAvgSumCount(Avg avg) {
     	}
     
     	private void writeObject(ObjectOutputStream out) throws IOException {
    -		Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
    -		if(!(thisMeasurable instanceof Avg) ) {
    -			throw new RuntimeException("Must be of type Avg");
    +		try {
    +			Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
    +			if(!(thisMeasurable instanceof Avg) ) {
    +				throw new RuntimeException("Must be of type Avg");
    +			}
    +			this.lastSumCount = getAvgSumCount((Avg) thisMeasurable);
    +			out.defaultWriteObject();
    +		} catch(Exception e) {
    +			// lol non-fatal error.
    --- End diff --
    
    log ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2206: [FLINK-4157] Catch Kafka metrics serialization exceptions

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2206
  
    Merging ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2206: [FLINK-4157] Catch Kafka metrics serialization exceptions

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2206
  
    I think this will flood your log with warnings, as soon as you have one non-serializable type, because the exception is logged on every serialization attempt.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2206: [FLINK-4157] Catch Kafka metrics serialization exceptions

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2206
  
    I updated the PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2206: [FLINK-4157] Catch Kafka metrics serialization exc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2206


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2206: [FLINK-4157] Catch Kafka metrics serialization exc...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2206#discussion_r69717001
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java ---
    @@ -127,12 +127,17 @@ private static AvgSumCount getAvgSumCount(Avg avg) {
     	}
     
     	private void writeObject(ObjectOutputStream out) throws IOException {
    -		Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
    -		if(!(thisMeasurable instanceof Avg) ) {
    -			throw new RuntimeException("Must be of type Avg");
    +		try {
    +			Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
    +			if(!(thisMeasurable instanceof Avg) ) {
    +				throw new RuntimeException("Must be of type Avg");
    +			}
    +			this.lastSumCount = getAvgSumCount((Avg) thisMeasurable);
    +			out.defaultWriteObject();
    +		} catch(Exception e) {
    --- End diff --
    
    A `RuntimeException` is also an `Exception`. Do you really want to catch this exception in the catch clause?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2206: [FLINK-4157] Catch Kafka metrics serialization exc...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2206#discussion_r69716824
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java ---
    @@ -127,12 +127,17 @@ private static AvgSumCount getAvgSumCount(Avg avg) {
     	}
     
     	private void writeObject(ObjectOutputStream out) throws IOException {
    -		Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
    -		if(!(thisMeasurable instanceof Avg) ) {
    -			throw new RuntimeException("Must be of type Avg");
    +		try {
    +			Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
    +			if(!(thisMeasurable instanceof Avg) ) {
    +				throw new RuntimeException("Must be of type Avg");
    +			}
    +			this.lastSumCount = getAvgSumCount((Avg) thisMeasurable);
    +			out.defaultWriteObject();
    +		} catch(Exception e) {
    +			// lol non-fatal error.
    --- End diff --
    
    lol?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2206: [FLINK-4157] Catch Kafka metrics serialization exceptions

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2206
  
    Since the types serialized by the method are not user provided, but from Kafka, I think that is an unlikely scenario.
    
    I looked again through the stack trace reported in the JIRA, and we can also catch the exception in the `AccumulatorRegistry.getSnapshot()` method, by catching all exceptions, not just IOExceptions.
    
    I'll update the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---