You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/04 13:41:25 UTC

flink git commit: [FLINK-7395] [metrics] Count bytesIn/Out without synchronization

Repository: flink
Updated Branches:
  refs/heads/master aa1f83333 -> eda583092


[FLINK-7395] [metrics] Count bytesIn/Out without synchronization

This closes #4504.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eda58309
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eda58309
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eda58309

Branch: refs/heads/master
Commit: eda583092481a21c613ef689590eeb7f669f8eef
Parents: aa1f833
Author: zentol <ch...@apache.org>
Authored: Wed Aug 9 14:43:51 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 4 14:41:16 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/io/network/api/writer/RecordWriter.java   | 6 +++---
 .../org/apache/flink/runtime/io/network/buffer/Buffer.java  | 9 +++++++++
 .../io/network/partition/consumer/LocalInputChannel.java    | 2 +-
 .../io/network/partition/consumer/RemoteInputChannel.java   | 2 +-
 4 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eda58309/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index c698ff5..623dc62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -117,7 +117,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 				Buffer buffer = serializer.getCurrentBuffer();
 
 				if (buffer != null) {
-					numBytesOut.inc(buffer.getSize());
+					numBytesOut.inc(buffer.getSizeUnsafe());
 					writeAndClearBuffer(buffer, targetChannel, serializer);
 
 					// If this was a full record, we are done. Not breaking
@@ -145,7 +145,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 				synchronized (serializer) {
 					Buffer buffer = serializer.getCurrentBuffer();
 					if (buffer != null) {
-						numBytesOut.inc(buffer.getSize());
+						numBytesOut.inc(buffer.getSizeUnsafe());
 						writeAndClearBuffer(buffer, targetChannel, serializer);
 					} else if (serializer.hasData()) {
 						// sanity check
@@ -173,7 +173,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 					Buffer buffer = serializer.getCurrentBuffer();
 
 					if (buffer != null) {
-						numBytesOut.inc(buffer.getSize());
+						numBytesOut.inc(buffer.getSizeUnsafe());
 						targetPartition.writeBuffer(buffer, targetChannel);
 					}
 				} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/eda58309/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index dbdf17f..d7980d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -93,6 +93,15 @@ public class Buffer {
 		return recycler;
 	}
 
+	/**
+	 * Returns the size of this buffer without synchronization.
+	 *
+	 * @return size of this buffer
+	 */
+	public int getSizeUnsafe() {
+		return currentSize;
+	}
+
 	public int getSize() {
 		synchronized (recycleLock) {
 			return currentSize;

http://git-wip-us.apache.org/repos/asf/flink/blob/eda58309/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 3ade2f8..71b3653 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -195,7 +195,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 		long remaining = numBuffersAvailable.decrementAndGet();
 
 		if (remaining >= 0) {
-			numBytesIn.inc(next.getSize());
+			numBytesIn.inc(next.getSizeUnsafe());
 			return new BufferAndAvailability(next, remaining > 0);
 		} else if (subpartitionView.isReleased()) {
 			throw new ProducerFailedException(subpartitionView.getFailureCause());

http://git-wip-us.apache.org/repos/asf/flink/blob/eda58309/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 4c156df..cd00934 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -188,7 +188,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 			remaining = receivedBuffers.size();
 		}
 
-		numBytesIn.inc(next.getSize());
+		numBytesIn.inc(next.getSizeUnsafe());
 		return new BufferAndAvailability(next, remaining > 0);
 	}