You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "TisonKun (Jira)" <ji...@apache.org> on 2019/09/06 10:20:00 UTC
[jira] [Created] (FLINK-13992) Refactor Optional parameter in
InputGateWithMetrics#updateMetrics
TisonKun created FLINK-13992:
--------------------------------
Summary: Refactor Optional parameter in InputGateWithMetrics#updateMetrics
Key: FLINK-13992
URL: https://issues.apache.org/jira/browse/FLINK-13992
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: TisonKun
Fix For: 1.10.0
As consensus from community code style discussion, in {{InputGateWithMetrics#updateMetrics}} we can refactor to reduce the usage of Optional parameter.
cc [~azagrebin]
{code:java}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 5d2cfd95c4..e548fbf02b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -67,12 +69,12 @@ public class InputGateWithMetrics extends InputGate {
@Override
public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
- return updateMetrics(inputGate.getNext());
+ return inputGate.getNext().map(this::updateMetrics);
}
@Override
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
- return updateMetrics(inputGate.pollNext());
+ return inputGate.pollNext().map(this::updateMetrics);
}
@Override
@@ -85,8 +87,8 @@ public class InputGateWithMetrics extends InputGate {
inputGate.close();
}
- private Optional<BufferOrEvent> updateMetrics(Optional<BufferOrEvent> bufferOrEvent) {
- bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize()));
+ private BufferOrEvent updateMetrics(@Nonnull BufferOrEvent bufferOrEvent) {
+ numBytesIn.inc(bufferOrEvent.getSize());
return bufferOrEvent;
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.2#803003)