You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/19 15:12:02 UTC

[flink] 02/10: [FLINK-17547][task][hotfix] Extract NonSpanningWrapper from SpillingAdaptiveSpanningRecordDeserializer (static inner class) As it is, no logical changes.

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39f5f1b0f09c37400ba113fdf33f90a832de5f0d
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed May 6 17:54:05 2020 +0200

    [FLINK-17547][task][hotfix] Extract NonSpanningWrapper from
    SpillingAdaptiveSpanningRecordDeserializer (static inner class)
    As it is, no logical changes.
---
 .../api/serialization/NonSpanningWrapper.java      | 296 +++++++++++++++++++++
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 271 -------------------
 2 files changed, 296 insertions(+), 271 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
new file mode 100644
index 0000000..bab50fa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.Optional;
+
+final class NonSpanningWrapper implements DataInputView {
+
+	MemorySegment segment;
+
+	private int limit;
+
+	int position;
+
+	private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
+	private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
+
+	int remaining() {
+		return this.limit - this.position;
+	}
+
+	void clear() {
+		this.segment = null;
+		this.limit = 0;
+		this.position = 0;
+	}
+
+	void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
+		this.segment = seg;
+		this.position = position;
+		this.limit = leftOverLimit;
+	}
+
+	Optional<MemorySegment> getUnconsumedSegment() {
+		if (remaining() == 0) {
+			return Optional.empty();
+		}
+		MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining());
+		segment.copyTo(position, target, 0, remaining());
+		return Optional.of(target);
+	}
+
+	// -------------------------------------------------------------------------------------------------------------
+	//                                       DataInput specific methods
+	// -------------------------------------------------------------------------------------------------------------
+
+	@Override
+	public final void readFully(byte[] b) throws IOException {
+		readFully(b, 0, b.length);
+	}
+
+	@Override
+	public final void readFully(byte[] b, int off, int len) throws IOException {
+		if (off < 0 || len < 0 || off + len > b.length) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		this.segment.get(this.position, b, off, len);
+		this.position += len;
+	}
+
+	@Override
+	public final boolean readBoolean() throws IOException {
+		return readByte() == 1;
+	}
+
+	@Override
+	public final byte readByte() throws IOException {
+		return this.segment.get(this.position++);
+	}
+
+	@Override
+	public final int readUnsignedByte() throws IOException {
+		return readByte() & 0xff;
+	}
+
+	@Override
+	public final short readShort() throws IOException {
+		final short v = this.segment.getShortBigEndian(this.position);
+		this.position += 2;
+		return v;
+	}
+
+	@Override
+	public final int readUnsignedShort() throws IOException {
+		final int v = this.segment.getShortBigEndian(this.position) & 0xffff;
+		this.position += 2;
+		return v;
+	}
+
+	@Override
+	public final char readChar() throws IOException  {
+		final char v = this.segment.getCharBigEndian(this.position);
+		this.position += 2;
+		return v;
+	}
+
+	@Override
+	public final int readInt() throws IOException {
+		final int v = this.segment.getIntBigEndian(this.position);
+		this.position += 4;
+		return v;
+	}
+
+	@Override
+	public final long readLong() throws IOException {
+		final long v = this.segment.getLongBigEndian(this.position);
+		this.position += 8;
+		return v;
+	}
+
+	@Override
+	public final float readFloat() throws IOException {
+		return Float.intBitsToFloat(readInt());
+	}
+
+	@Override
+	public final double readDouble() throws IOException {
+		return Double.longBitsToDouble(readLong());
+	}
+
+	@Override
+	public final String readLine() throws IOException {
+		final StringBuilder bld = new StringBuilder(32);
+
+		try {
+			int b;
+			while ((b = readUnsignedByte()) != '\n') {
+				if (b != '\r') {
+					bld.append((char) b);
+				}
+			}
+		}
+		catch (EOFException ignored) {}
+
+		if (bld.length() == 0) {
+			return null;
+		}
+
+		// trim a trailing carriage return
+		int len = bld.length();
+		if (len > 0 && bld.charAt(len - 1) == '\r') {
+			bld.setLength(len - 1);
+		}
+		return bld.toString();
+	}
+
+	@Override
+	public final String readUTF() throws IOException {
+		final int utflen = readUnsignedShort();
+
+		final byte[] bytearr;
+		final char[] chararr;
+
+		if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
+			bytearr = new byte[utflen];
+			this.utfByteBuffer = bytearr;
+		} else {
+			bytearr = this.utfByteBuffer;
+		}
+		if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
+			chararr = new char[utflen];
+			this.utfCharBuffer = chararr;
+		} else {
+			chararr = this.utfCharBuffer;
+		}
+
+		int c, char2, char3;
+		int count = 0;
+		int chararrCount = 0;
+
+		readFully(bytearr, 0, utflen);
+
+		while (count < utflen) {
+			c = (int) bytearr[count] & 0xff;
+			if (c > 127) {
+				break;
+			}
+			count++;
+			chararr[chararrCount++] = (char) c;
+		}
+
+		while (count < utflen) {
+			c = (int) bytearr[count] & 0xff;
+			switch (c >> 4) {
+			case 0:
+			case 1:
+			case 2:
+			case 3:
+			case 4:
+			case 5:
+			case 6:
+			case 7:
+				count++;
+				chararr[chararrCount++] = (char) c;
+				break;
+			case 12:
+			case 13:
+				count += 2;
+				if (count > utflen) {
+					throw new UTFDataFormatException("malformed input: partial character at end");
+				}
+				char2 = (int) bytearr[count - 1];
+				if ((char2 & 0xC0) != 0x80) {
+					throw new UTFDataFormatException("malformed input around byte " + count);
+				}
+				chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+				break;
+			case 14:
+				count += 3;
+				if (count > utflen) {
+					throw new UTFDataFormatException("malformed input: partial character at end");
+				}
+				char2 = (int) bytearr[count - 2];
+				char3 = (int) bytearr[count - 1];
+				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+					throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+				}
+				chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
+				break;
+			default:
+				throw new UTFDataFormatException("malformed input around byte " + count);
+			}
+		}
+		// The number of chars produced may be less than utflen
+		return new String(chararr, 0, chararrCount);
+	}
+
+	@Override
+	public final int skipBytes(int n) throws IOException {
+		if (n < 0) {
+			throw new IllegalArgumentException();
+		}
+
+		int toSkip = Math.min(n, remaining());
+		this.position += toSkip;
+		return toSkip;
+	}
+
+	@Override
+	public void skipBytesToRead(int numBytes) throws IOException {
+		int skippedBytes = skipBytes(numBytes);
+
+		if (skippedBytes < numBytes){
+			throw new EOFException("Could not skip " + numBytes + " bytes.");
+		}
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len) throws IOException {
+		if (b == null){
+			throw new NullPointerException("Byte array b cannot be null.");
+		}
+
+		if (off < 0){
+			throw new IllegalArgumentException("The offset off cannot be negative.");
+		}
+
+		if (len < 0){
+			throw new IllegalArgumentException("The length len cannot be negative.");
+		}
+
+		int toRead = Math.min(len, remaining());
+		this.segment.get(this.position, b, off, toRead);
+		this.position += toRead;
+
+		return toRead;
+	}
+
+	@Override
+	public int read(byte[] b) throws IOException {
+		return read(b, 0, b.length);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 346bdfc..5003e78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -32,12 +32,10 @@ import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.io.UTFDataFormatException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
@@ -184,275 +182,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 	// -----------------------------------------------------------------------------------------------------------------
 
-	private static final class NonSpanningWrapper implements DataInputView {
-
-		private MemorySegment segment;
-
-		private int limit;
-
-		private int position;
-
-		private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
-		private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
-
-		int remaining() {
-			return this.limit - this.position;
-		}
-
-		void clear() {
-			this.segment = null;
-			this.limit = 0;
-			this.position = 0;
-		}
-
-		void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
-			this.segment = seg;
-			this.position = position;
-			this.limit = leftOverLimit;
-		}
-
-		Optional<MemorySegment> getUnconsumedSegment() {
-			if (remaining() == 0) {
-				return Optional.empty();
-			}
-			MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining());
-			segment.copyTo(position, target, 0, remaining());
-			return Optional.of(target);
-		}
-
-		// -------------------------------------------------------------------------------------------------------------
-		//                                       DataInput specific methods
-		// -------------------------------------------------------------------------------------------------------------
-
-		@Override
-		public final void readFully(byte[] b) throws IOException {
-			readFully(b, 0, b.length);
-		}
-
-		@Override
-		public final void readFully(byte[] b, int off, int len) throws IOException {
-			if (off < 0 || len < 0 || off + len > b.length) {
-				throw new IndexOutOfBoundsException();
-			}
-
-			this.segment.get(this.position, b, off, len);
-			this.position += len;
-		}
-
-		@Override
-		public final boolean readBoolean() throws IOException {
-			return readByte() == 1;
-		}
-
-		@Override
-		public final byte readByte() throws IOException {
-			return this.segment.get(this.position++);
-		}
-
-		@Override
-		public final int readUnsignedByte() throws IOException {
-			return readByte() & 0xff;
-		}
-
-		@Override
-		public final short readShort() throws IOException {
-			final short v = this.segment.getShortBigEndian(this.position);
-			this.position += 2;
-			return v;
-		}
-
-		@Override
-		public final int readUnsignedShort() throws IOException {
-			final int v = this.segment.getShortBigEndian(this.position) & 0xffff;
-			this.position += 2;
-			return v;
-		}
-
-		@Override
-		public final char readChar() throws IOException  {
-			final char v = this.segment.getCharBigEndian(this.position);
-			this.position += 2;
-			return v;
-		}
-
-		@Override
-		public final int readInt() throws IOException {
-			final int v = this.segment.getIntBigEndian(this.position);
-			this.position += 4;
-			return v;
-		}
-
-		@Override
-		public final long readLong() throws IOException {
-			final long v = this.segment.getLongBigEndian(this.position);
-			this.position += 8;
-			return v;
-		}
-
-		@Override
-		public final float readFloat() throws IOException {
-			return Float.intBitsToFloat(readInt());
-		}
-
-		@Override
-		public final double readDouble() throws IOException {
-			return Double.longBitsToDouble(readLong());
-		}
-
-		@Override
-		public final String readLine() throws IOException {
-			final StringBuilder bld = new StringBuilder(32);
-
-			try {
-				int b;
-				while ((b = readUnsignedByte()) != '\n') {
-					if (b != '\r') {
-						bld.append((char) b);
-					}
-				}
-			}
-			catch (EOFException ignored) {}
-
-			if (bld.length() == 0) {
-				return null;
-			}
-
-			// trim a trailing carriage return
-			int len = bld.length();
-			if (len > 0 && bld.charAt(len - 1) == '\r') {
-				bld.setLength(len - 1);
-			}
-			return bld.toString();
-		}
-
-		@Override
-		public final String readUTF() throws IOException {
-			final int utflen = readUnsignedShort();
-
-			final byte[] bytearr;
-			final char[] chararr;
-
-			if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
-				bytearr = new byte[utflen];
-				this.utfByteBuffer = bytearr;
-			} else {
-				bytearr = this.utfByteBuffer;
-			}
-			if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
-				chararr = new char[utflen];
-				this.utfCharBuffer = chararr;
-			} else {
-				chararr = this.utfCharBuffer;
-			}
-
-			int c, char2, char3;
-			int count = 0;
-			int chararrCount = 0;
-
-			readFully(bytearr, 0, utflen);
-
-			while (count < utflen) {
-				c = (int) bytearr[count] & 0xff;
-				if (c > 127) {
-					break;
-				}
-				count++;
-				chararr[chararrCount++] = (char) c;
-			}
-
-			while (count < utflen) {
-				c = (int) bytearr[count] & 0xff;
-				switch (c >> 4) {
-				case 0:
-				case 1:
-				case 2:
-				case 3:
-				case 4:
-				case 5:
-				case 6:
-				case 7:
-					count++;
-					chararr[chararrCount++] = (char) c;
-					break;
-				case 12:
-				case 13:
-					count += 2;
-					if (count > utflen) {
-						throw new UTFDataFormatException("malformed input: partial character at end");
-					}
-					char2 = (int) bytearr[count - 1];
-					if ((char2 & 0xC0) != 0x80) {
-						throw new UTFDataFormatException("malformed input around byte " + count);
-					}
-					chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
-					break;
-				case 14:
-					count += 3;
-					if (count > utflen) {
-						throw new UTFDataFormatException("malformed input: partial character at end");
-					}
-					char2 = (int) bytearr[count - 2];
-					char3 = (int) bytearr[count - 1];
-					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
-						throw new UTFDataFormatException("malformed input around byte " + (count - 1));
-					}
-					chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
-					break;
-				default:
-					throw new UTFDataFormatException("malformed input around byte " + count);
-				}
-			}
-			// The number of chars produced may be less than utflen
-			return new String(chararr, 0, chararrCount);
-		}
-
-		@Override
-		public final int skipBytes(int n) throws IOException {
-			if (n < 0) {
-				throw new IllegalArgumentException();
-			}
-
-			int toSkip = Math.min(n, remaining());
-			this.position += toSkip;
-			return toSkip;
-		}
-
-		@Override
-		public void skipBytesToRead(int numBytes) throws IOException {
-			int skippedBytes = skipBytes(numBytes);
-
-			if (skippedBytes < numBytes){
-				throw new EOFException("Could not skip " + numBytes + " bytes.");
-			}
-		}
-
-		@Override
-		public int read(byte[] b, int off, int len) throws IOException {
-			if (b == null){
-				throw new NullPointerException("Byte array b cannot be null.");
-			}
-
-			if (off < 0){
-				throw new IllegalArgumentException("The offset off cannot be negative.");
-			}
-
-			if (len < 0){
-				throw new IllegalArgumentException("The length len cannot be negative.");
-			}
-
-			int toRead = Math.min(len, remaining());
-			this.segment.get(this.position, b, off, toRead);
-			this.position += toRead;
-
-			return toRead;
-		}
-
-		@Override
-		public int read(byte[] b) throws IOException {
-			return read(b, 0, b.length);
-		}
-	}
-
 	// -----------------------------------------------------------------------------------------------------------------
 
 	private static final class SpanningWrapper {