You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/08 13:04:00 UTC
[06/15] flink git commit: [hotfix][network] Drop redundant this
reference usages
[hotfix][network] Drop redundant this reference usages
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f60a1de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f60a1de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f60a1de
Branch: refs/heads/master
Commit: 1f60a1de563ccca4ea0309fbd5c6c3531090ddc9
Parents: 1752fdb
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Dec 18 15:26:20 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jan 8 11:46:00 2018 +0100
----------------------------------------------------------------------
.../serialization/SpanningRecordSerializer.java | 76 ++++++++++----------
1 file changed, 38 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1f60a1de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 87b9e4c..7394f83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -59,14 +59,14 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
private int limit;
public SpanningRecordSerializer() {
- this.serializationBuffer = new DataOutputSerializer(128);
+ serializationBuffer = new DataOutputSerializer(128);
- this.lengthBuffer = ByteBuffer.allocate(4);
- this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
+ lengthBuffer = ByteBuffer.allocate(4);
+ lengthBuffer.order(ByteOrder.BIG_ENDIAN);
// ensure initial state with hasRemaining false (for correct setNextBuffer logic)
- this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
- this.lengthBuffer.position(4);
+ dataBuffer = serializationBuffer.wrapAsByteBuffer();
+ lengthBuffer.position(4);
}
/**
@@ -81,50 +81,50 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
@Override
public SerializationResult addRecord(T record) throws IOException {
if (CHECKED) {
- if (this.dataBuffer.hasRemaining()) {
+ if (dataBuffer.hasRemaining()) {
throw new IllegalStateException("Pending serialization of previous record.");
}
}
- this.serializationBuffer.clear();
- this.lengthBuffer.clear();
+ serializationBuffer.clear();
+ lengthBuffer.clear();
// write data and length
- record.write(this.serializationBuffer);
+ record.write(serializationBuffer);
- int len = this.serializationBuffer.length();
- this.lengthBuffer.putInt(0, len);
+ int len = serializationBuffer.length();
+ lengthBuffer.putInt(0, len);
- this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
+ dataBuffer = serializationBuffer.wrapAsByteBuffer();
// Copy from intermediate buffers to current target memory segment
- copyToTargetBufferFrom(this.lengthBuffer);
- copyToTargetBufferFrom(this.dataBuffer);
+ copyToTargetBufferFrom(lengthBuffer);
+ copyToTargetBufferFrom(dataBuffer);
return getSerializationResult();
}
@Override
public SerializationResult setNextBuffer(Buffer buffer) throws IOException {
- this.targetBuffer = buffer;
- this.position = 0;
- this.limit = buffer.getSize();
+ targetBuffer = buffer;
+ position = 0;
+ limit = buffer.getSize();
- if (this.lengthBuffer.hasRemaining()) {
- copyToTargetBufferFrom(this.lengthBuffer);
+ if (lengthBuffer.hasRemaining()) {
+ copyToTargetBufferFrom(lengthBuffer);
}
- if (this.dataBuffer.hasRemaining()) {
- copyToTargetBufferFrom(this.dataBuffer);
+ if (dataBuffer.hasRemaining()) {
+ copyToTargetBufferFrom(dataBuffer);
}
SerializationResult result = getSerializationResult();
// make sure we don't hold onto the large buffers for too long
if (result.isFullRecord()) {
- this.serializationBuffer.clear();
- this.serializationBuffer.pruneBuffer();
- this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
+ serializationBuffer.clear();
+ serializationBuffer.pruneBuffer();
+ dataBuffer = serializationBuffer.wrapAsByteBuffer();
}
return result;
@@ -137,22 +137,22 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
* @param source the {@link ByteBuffer} to copy data from
*/
private void copyToTargetBufferFrom(ByteBuffer source) {
- if (this.targetBuffer == null) {
+ if (targetBuffer == null) {
return;
}
int needed = source.remaining();
- int available = this.limit - this.position;
+ int available = limit - position;
int toCopy = Math.min(needed, available);
- this.targetBuffer.getMemorySegment().put(this.position, source, toCopy);
+ targetBuffer.getMemorySegment().put(position, source, toCopy);
- this.position += toCopy;
+ position += toCopy;
}
private SerializationResult getSerializationResult() {
- if (!this.dataBuffer.hasRemaining() && !this.lengthBuffer.hasRemaining()) {
- return (this.position < this.limit)
+ if (!dataBuffer.hasRemaining() && !lengthBuffer.hasRemaining()) {
+ return (position < limit)
? SerializationResult.FULL_RECORD
: SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
}
@@ -166,8 +166,8 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
return null;
}
- this.targetBuffer.setSize(this.position);
- return this.targetBuffer;
+ targetBuffer.setSize(position);
+ return targetBuffer;
}
@Override
@@ -179,19 +179,19 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
@Override
public void clear() {
- this.targetBuffer = null;
- this.position = 0;
- this.limit = 0;
+ targetBuffer = null;
+ position = 0;
+ limit = 0;
// ensure clear state with hasRemaining false (for correct setNextBuffer logic)
- this.dataBuffer.position(this.dataBuffer.limit());
- this.lengthBuffer.position(4);
+ dataBuffer.position(dataBuffer.limit());
+ lengthBuffer.position(4);
}
@Override
public boolean hasData() {
// either data in current target buffer or intermediate buffers
- return this.position > 0 || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining());
+ return position > 0 || (lengthBuffer.hasRemaining() || dataBuffer.hasRemaining());
}
@Override