You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/31 20:25:05 UTC

[flink] branch master updated: [FLINK-12642][network][metrics] Fix In/OutputBufferPoolUsageGauge failure with NPE

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bc16485  [FLINK-12642][network][metrics] Fix In/OutputBufferPoolUsageGauge failure with NPE
bc16485 is described below

commit bc16485cc89fbe5b0dd1534737d0b5cd1ced885b
Author: azagrebin <az...@users.noreply.github.com>
AuthorDate: Fri May 31 22:24:54 2019 +0200

    [FLINK-12642][network][metrics] Fix In/OutputBufferPoolUsageGauge failure with NPE
    
    The result partition metrics are initialised before `ResultPartitiion#setup` was called. If a reporter tries to access a In/OutputBufferPoolUsageGauge in between it will fail with an `NullPointerException` since the `BufferPool` of the partition is still `null`. Currently, the quick fix is to return zero metrics until the `BufferPool` is initialised. When we have a single-threaded access from `Task#run`, we can merge partition/gate create and setup then it should not be the case anymore.
---
 .../runtime/io/network/metrics/InputBufferPoolUsageGauge.java    | 8 ++++++--
 .../runtime/io/network/metrics/OutputBufferPoolUsageGauge.java   | 9 +++++++--
 2 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
index 992f561..c7a6d4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.metrics;
 
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 
 /**
@@ -38,8 +39,11 @@ public class InputBufferPoolUsageGauge implements Gauge<Float> {
 		int bufferPoolSize = 0;
 
 		for (SingleInputGate inputGate : inputGates) {
-			usedBuffers += inputGate.getBufferPool().bestEffortGetNumOfUsedBuffers();
-			bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
+			BufferPool bufferPool = inputGate.getBufferPool();
+			if (bufferPool != null) {
+				usedBuffers += bufferPool.bestEffortGetNumOfUsedBuffers();
+				bufferPoolSize += bufferPool.getNumBuffers();
+			}
 		}
 
 		if (bufferPoolSize != 0) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java
index 9aad92c..b8f771b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.metrics;
 
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 
 /**
@@ -38,8 +39,12 @@ public class OutputBufferPoolUsageGauge implements Gauge<Float> {
 		int bufferPoolSize = 0;
 
 		for (ResultPartition resultPartition : resultPartitions) {
-			usedBuffers += resultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers();
-			bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
+			BufferPool bufferPool = resultPartition.getBufferPool();
+
+			if (bufferPool != null) {
+				usedBuffers += bufferPool.bestEffortGetNumOfUsedBuffers();
+				bufferPoolSize += bufferPool.getNumBuffers();
+			}
 		}
 
 		if (bufferPoolSize != 0) {