You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/31 20:25:05 UTC
[flink] branch master updated: [FLINK-12642][network][metrics] Fix
In/OutputBufferPoolUsageGauge failure with NPE
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bc16485 [FLINK-12642][network][metrics] Fix In/OutputBufferPoolUsageGauge failure with NPE
bc16485 is described below
commit bc16485cc89fbe5b0dd1534737d0b5cd1ced885b
Author: azagrebin <az...@users.noreply.github.com>
AuthorDate: Fri May 31 22:24:54 2019 +0200
[FLINK-12642][network][metrics] Fix In/OutputBufferPoolUsageGauge failure with NPE
The result partition metrics are initialised before `ResultPartitiion#setup` was called. If a reporter tries to access a In/OutputBufferPoolUsageGauge in between it will fail with an `NullPointerException` since the `BufferPool` of the partition is still `null`. Currently, the quick fix is to return zero metrics until the `BufferPool` is initialised. When we have a single-threaded access from `Task#run`, we can merge partition/gate create and setup then it should not be the case anymore.
---
.../runtime/io/network/metrics/InputBufferPoolUsageGauge.java | 8 ++++++--
.../runtime/io/network/metrics/OutputBufferPoolUsageGauge.java | 9 +++++++--
2 files changed, 13 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
index 992f561..c7a6d4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.metrics;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
/**
@@ -38,8 +39,11 @@ public class InputBufferPoolUsageGauge implements Gauge<Float> {
int bufferPoolSize = 0;
for (SingleInputGate inputGate : inputGates) {
- usedBuffers += inputGate.getBufferPool().bestEffortGetNumOfUsedBuffers();
- bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
+ BufferPool bufferPool = inputGate.getBufferPool();
+ if (bufferPool != null) {
+ usedBuffers += bufferPool.bestEffortGetNumOfUsedBuffers();
+ bufferPoolSize += bufferPool.getNumBuffers();
+ }
}
if (bufferPoolSize != 0) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java
index 9aad92c..b8f771b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.metrics;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
/**
@@ -38,8 +39,12 @@ public class OutputBufferPoolUsageGauge implements Gauge<Float> {
int bufferPoolSize = 0;
for (ResultPartition resultPartition : resultPartitions) {
- usedBuffers += resultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers();
- bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
+ BufferPool bufferPool = resultPartition.getBufferPool();
+
+ if (bufferPool != null) {
+ usedBuffers += bufferPool.bestEffortGetNumOfUsedBuffers();
+ bufferPoolSize += bufferPool.getNumBuffers();
+ }
}
if (bufferPoolSize != 0) {