You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/05/29 12:51:57 UTC

flink git commit: [runtime] Clear serializer state when clearing RecordWriter

Repository: flink
Updated Branches:
  refs/heads/master d594d0242 -> 5a7ceda61


[runtime] Clear serializer state when clearing RecordWriter


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a7ceda6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a7ceda6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a7ceda6

Branch: refs/heads/master
Commit: 5a7ceda61227336115723da969ee649202a8dbb6
Parents: d594d02
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri May 29 12:51:39 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri May 29 12:51:39 2015 +0200

----------------------------------------------------------------------
 .../io/network/api/writer/RecordWriter.java     | 10 ++++++---
 .../io/network/api/writer/RecordWriterTest.java | 22 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5a7ceda6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 58f96a0..4381fd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -162,9 +162,13 @@ public class RecordWriter<T extends IOReadableWritable> {
 	public void clearBuffers() {
 		if (serializers != null) {
 			for (RecordSerializer<?> s : serializers) {
-				Buffer b = s.getCurrentBuffer();
-				if (b != null) {
-					b.recycle();
+				synchronized (s) {
+					Buffer b = s.getCurrentBuffer();
+					s.clear();
+
+					if (b != null) {
+						b.recycle();
+					}
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5a7ceda6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index bfe9931..7061fb8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -241,6 +241,28 @@ public class RecordWriterTest {
 		}
 	}
 
+	@Test
+	public void testSerializerClearedAfterClearBuffers() throws Exception {
+
+		final Buffer buffer = TestBufferFactory.createBuffer(16);
+
+		ResultPartitionWriter partitionWriter = createResultPartitionWriter(
+				createBufferProvider(buffer));
+
+		RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
+
+		// Fill a buffer, but don't write it out.
+		recordWriter.emit(new IntValue(0));
+		verify(partitionWriter, never()).writeBuffer(any(Buffer.class), anyInt());
+
+		// Clear all buffers.
+		recordWriter.clearBuffers();
+
+		// This should not throw an Exception iff the serializer state
+		// has been cleared as expected.
+		recordWriter.flush();
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// Helpers
 	// ---------------------------------------------------------------------------------------------