You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/04 13:41:25 UTC
flink git commit: [FLINK-7395] [metrics] Count bytesIn/Out without
synchronization
Repository: flink
Updated Branches:
refs/heads/master aa1f83333 -> eda583092
[FLINK-7395] [metrics] Count bytesIn/Out without synchronization
This closes #4504.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eda58309
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eda58309
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eda58309
Branch: refs/heads/master
Commit: eda583092481a21c613ef689590eeb7f669f8eef
Parents: aa1f833
Author: zentol <ch...@apache.org>
Authored: Wed Aug 9 14:43:51 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 4 14:41:16 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/io/network/api/writer/RecordWriter.java | 6 +++---
.../org/apache/flink/runtime/io/network/buffer/Buffer.java | 9 +++++++++
.../io/network/partition/consumer/LocalInputChannel.java | 2 +-
.../io/network/partition/consumer/RemoteInputChannel.java | 2 +-
4 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eda58309/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index c698ff5..623dc62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -117,7 +117,7 @@ public class RecordWriter<T extends IOReadableWritable> {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) {
- numBytesOut.inc(buffer.getSize());
+ numBytesOut.inc(buffer.getSizeUnsafe());
writeAndClearBuffer(buffer, targetChannel, serializer);
// If this was a full record, we are done. Not breaking
@@ -145,7 +145,7 @@ public class RecordWriter<T extends IOReadableWritable> {
synchronized (serializer) {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) {
- numBytesOut.inc(buffer.getSize());
+ numBytesOut.inc(buffer.getSizeUnsafe());
writeAndClearBuffer(buffer, targetChannel, serializer);
} else if (serializer.hasData()) {
// sanity check
@@ -173,7 +173,7 @@ public class RecordWriter<T extends IOReadableWritable> {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) {
- numBytesOut.inc(buffer.getSize());
+ numBytesOut.inc(buffer.getSizeUnsafe());
targetPartition.writeBuffer(buffer, targetChannel);
}
} finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/eda58309/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index dbdf17f..d7980d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -93,6 +93,15 @@ public class Buffer {
return recycler;
}
+ /**
+ * Returns the size of this buffer without synchronization.
+ *
+ * @return size of this buffer
+ */
+ public int getSizeUnsafe() {
+ return currentSize;
+ }
+
public int getSize() {
synchronized (recycleLock) {
return currentSize;
http://git-wip-us.apache.org/repos/asf/flink/blob/eda58309/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 3ade2f8..71b3653 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -195,7 +195,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
long remaining = numBuffersAvailable.decrementAndGet();
if (remaining >= 0) {
- numBytesIn.inc(next.getSize());
+ numBytesIn.inc(next.getSizeUnsafe());
return new BufferAndAvailability(next, remaining > 0);
} else if (subpartitionView.isReleased()) {
throw new ProducerFailedException(subpartitionView.getFailureCause());
http://git-wip-us.apache.org/repos/asf/flink/blob/eda58309/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 4c156df..cd00934 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -188,7 +188,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
remaining = receivedBuffers.size();
}
- numBytesIn.inc(next.getSize());
+ numBytesIn.inc(next.getSizeUnsafe());
return new BufferAndAvailability(next, remaining > 0);
}