You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/09 15:24:45 UTC

[3/4] flink git commit: [javadocs, network] Add javadocs to SpanningRecordSerializer and RecordSerializer

[javadocs, network] Add javadocs to SpanningRecordSerializer and RecordSerializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/161bab04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/161bab04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/161bab04

Branch: refs/heads/master
Commit: 161bab04965b0110bfa80dcfcd1963f12b3cfda3
Parents: 384da0e
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 17 13:57:18 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 9 16:22:41 2017 +0100

----------------------------------------------------------------------
 .../api/serialization/RecordSerializer.java     | 72 ++++++++++++++++++--
 .../serialization/SpanningRecordSerializer.java | 16 +++++
 .../io/network/api/writer/RecordWriter.java     |  5 +-
 3 files changed, 85 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index e8179dc..5fe56c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -43,30 +43,90 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 			this.isFullRecord = isFullRecord;
 			this.isFullBuffer = isFullBuffer;
 		}
-		
+
+		/**
+		 * Whether the full record was serialized and completely written to
+		 * a target buffer.
+		 *
+		 * @return <tt>true</tt> if the complete record was written
+		 */
 		public boolean isFullRecord() {
 			return this.isFullRecord;
 		}
-		
+
+		/**
+		 * Whether the target buffer is full after the serialization process.
+		 *
+		 * @return <tt>true</tt> if the target buffer is full
+		 */
 		public boolean isFullBuffer() {
 			return this.isFullBuffer;
 		}
 	}
-	
+
+	/**
+	 * Starts serializing and copying the given record to the target buffer
+	 * (if available).
+	 *
+	 * @param record the record to serialize
+	 * @return how much information was written to the target buffer and
+	 *         whether this buffer is full
+	 * @throws IOException
+	 */
 	SerializationResult addRecord(T record) throws IOException;
 
+	/**
+	 * Sets a (next) target buffer to use and continues writing remaining data
+	 * to it until it is full.
+	 *
+	 * @param buffer the new target buffer to use
+	 * @return how much information was written to the target buffer and
+	 *         whether this buffer is full
+	 * @throws IOException
+	 */
 	SerializationResult setNextBuffer(Buffer buffer) throws IOException;
 
+	/**
+	 * Retrieves the current target buffer and sets its size to the actual
+	 * number of written bytes.
+	 *
+	 * After calling this method, a new target buffer is required to continue
+	 * writing (see {@link #setNextBuffer(Buffer)}).
+	 *
+	 * @return the target buffer that was used
+	 */
 	Buffer getCurrentBuffer();
 
+	/**
+	 * Resets the target buffer to <tt>null</tt>.
+	 *
+	 * <p><strong>NOTE:</strong> After calling this method, <strong>a new target
+	 * buffer is required to continue writing</strong> (see
+	 * {@link #setNextBuffer(Buffer)}).</p>
+	 */
 	void clearCurrentBuffer();
-	
+
+	/**
+	 * Resets the target buffer to <tt>null</tt> and resets internal state set
+	 * up for the record to serialize.
+	 *
+	 * <p><strong>NOTE:</strong> After calling this method, a <strong>new record
+	 * and a new target buffer is required to start writing again</strong>
+	 * (see {@link #setNextBuffer(Buffer)}). If you want to continue
+	 * with the current record, use {@link #clearCurrentBuffer()} instead.</p>
+	 */
 	void clear();
-	
+
+	/**
+	 * Determines whether data is left, either in the current target buffer or
+	 * in any internal state set up for the record to serialize.
+	 *
+	 * @return <tt>true</tt> if some data is present
+	 */
 	boolean hasData();
 
 	/**
-	 * Insantiates all metrics.
+	 * Instantiates all metrics.
 	 *
 	 * @param metrics metric group
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index a8fe3fe..335d12e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -29,6 +29,13 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 
+/**
+ * Record serializer which serializes the complete record to an intermediate
+ * data serialization buffer and copies this buffer to target buffers
+ * one-by-one using {@link #setNextBuffer(Buffer)}.
+ *
+ * @param <T>
+ */
 public class SpanningRecordSerializer<T extends IOReadableWritable> implements RecordSerializer<T> {
 
 	/** Flag to enable/disable checks, if buffer not set/full or pending serialization */
@@ -65,6 +72,15 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		this.lengthBuffer.position(4);
 	}
 
+	/**
+	 * Serializes the complete record to an intermediate data serialization
+	 * buffer and starts copying it to the target buffer (if available).
+	 *
+	 * @param record the record to serialize
+	 * @return how much information was written to the target buffer and
+	 *         whether this buffer is full
+	 * @throws IOException
+	 */
 	@Override
 	public SerializationResult addRecord(T record) throws IOException {
 		if (CHECKED) {

http://git-wip-us.apache.org/repos/asf/flink/blob/161bab04/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 799187d..87d34ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -208,9 +208,10 @@ public class RecordWriter<T extends IOReadableWritable> {
 	}
 
 	/**
-	 * Writes the buffer to the {@link ResultPartitionWriter}.
+	 * Writes the buffer to the {@link ResultPartitionWriter} and removes the
+	 * buffer from the serializer state.
 	 *
-	 * <p> The buffer is cleared from the serializer state after a call to this method.
+	 * Needs to be synchronized on the serializer!
 	 */
 	private void writeAndClearBuffer(
 			Buffer buffer,