You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/09 15:24:45 UTC
[3/4] flink git commit: [javadocs,
network] Add javadocs to SpanningRecordSerializer and RecordSerializer
[javadocs, network] Add javadocs to SpanningRecordSerializer and RecordSerializer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/161bab04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/161bab04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/161bab04
Branch: refs/heads/master
Commit: 161bab04965b0110bfa80dcfcd1963f12b3cfda3
Parents: 384da0e
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 17 13:57:18 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 9 16:22:41 2017 +0100
----------------------------------------------------------------------
.../api/serialization/RecordSerializer.java | 72 ++++++++++++++++++--
.../serialization/SpanningRecordSerializer.java | 16 +++++
.../io/network/api/writer/RecordWriter.java | 5 +-
3 files changed, 85 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index e8179dc..5fe56c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -43,30 +43,90 @@ public interface RecordSerializer<T extends IOReadableWritable> {
this.isFullRecord = isFullRecord;
this.isFullBuffer = isFullBuffer;
}
-
+
+ /**
+ * Whether the full record was serialized and completely written to
+ * a target buffer.
+ *
+ * @return <tt>true</tt> if the complete record was written
+ */
public boolean isFullRecord() {
return this.isFullRecord;
}
-
+
+ /**
+ * Whether the target buffer is full after the serialization process.
+ *
+ * @return <tt>true</tt> if the target buffer is full
+ */
public boolean isFullBuffer() {
return this.isFullBuffer;
}
}
-
+
+ /**
+ * Starts serializing and copying the given record to the target buffer
+ * (if available).
+ *
+ * @param record the record to serialize
+ * @return how much information was written to the target buffer and
+ * whether this buffer is full
+ * @throws IOException
+ */
SerializationResult addRecord(T record) throws IOException;
+ /**
+ * Sets a (next) target buffer to use and continues writing remaining data
+ * to it until it is full.
+ *
+ * @param buffer the new target buffer to use
+ * @return how much information was written to the target buffer and
+ * whether this buffer is full
+ * @throws IOException
+ */
SerializationResult setNextBuffer(Buffer buffer) throws IOException;
+ /**
+ * Retrieves the current target buffer and sets its size to the actual
+ * number of written bytes.
+ *
+ * After calling this method, a new target buffer is required to continue
+ * writing (see {@link #setNextBuffer(Buffer)}).
+ *
+ * @return the target buffer that was used
+ */
Buffer getCurrentBuffer();
+ /**
+ * Resets the target buffer to <tt>null</tt>.
+ *
+ * <p><strong>NOTE:</strong> After calling this method, <strong>a new target
+ * buffer is required to continue writing</strong> (see
+ * {@link #setNextBuffer(Buffer)}).</p>
+ */
void clearCurrentBuffer();
-
+
+ /**
+ * Resets the target buffer to <tt>null</tt> and resets internal state set
+ * up for the record to serialize.
+ *
+ * <p><strong>NOTE:</strong> After calling this method, a <strong>new record
+ * and a new target buffer is required to start writing again</strong>
+ * (see {@link #setNextBuffer(Buffer)}). If you want to continue
+ * with the current record, use {@link #clearCurrentBuffer()} instead.</p>
+ */
void clear();
-
+
+ /**
+ * Determines whether data is left, either in the current target buffer or
+ * in any internal state set up for the record to serialize.
+ *
+ * @return <tt>true</tt> if some data is present
+ */
boolean hasData();
/**
- * Insantiates all metrics.
+ * Instantiates all metrics.
*
* @param metrics metric group
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index a8fe3fe..335d12e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -29,6 +29,13 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataOutputSerializer;
+/**
+ * Record serializer which serializes the complete record to an intermediate
+ * data serialization buffer and copies this buffer to target buffers
+ * one-by-one using {@link #setNextBuffer(Buffer)}.
+ *
+ * @param <T>
+ */
public class SpanningRecordSerializer<T extends IOReadableWritable> implements RecordSerializer<T> {
/** Flag to enable/disable checks, if buffer not set/full or pending serialization */
@@ -65,6 +72,15 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
this.lengthBuffer.position(4);
}
+ /**
+ * Serializes the complete record to an intermediate data serialization
+ * buffer and starts copying it to the target buffer (if available).
+ *
+ * @param record the record to serialize
+ * @return how much information was written to the target buffer and
+ * whether this buffer is full
+ * @throws IOException
+ */
@Override
public SerializationResult addRecord(T record) throws IOException {
if (CHECKED) {
http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 799187d..87d34ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -208,9 +208,10 @@ public class RecordWriter<T extends IOReadableWritable> {
}
/**
- * Writes the buffer to the {@link ResultPartitionWriter}.
+ * Writes the buffer to the {@link ResultPartitionWriter} and removes the
+ * buffer from the serializer state.
*
- * <p> The buffer is cleared from the serializer state after a call to this method.
+ * Needs to be synchronized on the serializer!
*/
private void writeAndClearBuffer(
Buffer buffer,