You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/05/29 12:51:57 UTC
flink git commit: [runtime] Clear serializer state when clearing
RecordWriter
Repository: flink
Updated Branches:
refs/heads/master d594d0242 -> 5a7ceda61
[runtime] Clear serializer state when clearing RecordWriter
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a7ceda6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a7ceda6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a7ceda6
Branch: refs/heads/master
Commit: 5a7ceda61227336115723da969ee649202a8dbb6
Parents: d594d02
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri May 29 12:51:39 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri May 29 12:51:39 2015 +0200
----------------------------------------------------------------------
.../io/network/api/writer/RecordWriter.java | 10 ++++++---
.../io/network/api/writer/RecordWriterTest.java | 22 ++++++++++++++++++++
2 files changed, 29 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5a7ceda6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 58f96a0..4381fd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -162,9 +162,13 @@ public class RecordWriter<T extends IOReadableWritable> {
public void clearBuffers() {
if (serializers != null) {
for (RecordSerializer<?> s : serializers) {
- Buffer b = s.getCurrentBuffer();
- if (b != null) {
- b.recycle();
+ synchronized (s) {
+ Buffer b = s.getCurrentBuffer();
+ s.clear();
+
+ if (b != null) {
+ b.recycle();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5a7ceda6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index bfe9931..7061fb8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -241,6 +241,28 @@ public class RecordWriterTest {
}
}
+ @Test
+ public void testSerializerClearedAfterClearBuffers() throws Exception {
+
+ final Buffer buffer = TestBufferFactory.createBuffer(16);
+
+ ResultPartitionWriter partitionWriter = createResultPartitionWriter(
+ createBufferProvider(buffer));
+
+ RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
+
+ // Fill a buffer, but don't write it out.
+ recordWriter.emit(new IntValue(0));
+ verify(partitionWriter, never()).writeBuffer(any(Buffer.class), anyInt());
+
+ // Clear all buffers.
+ recordWriter.clearBuffers();
+
+ // This should not throw an Exception iff the serializer state
+ // has been cleared as expected.
+ recordWriter.flush();
+ }
+
// ---------------------------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------------------------