You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/19 15:12:02 UTC
[flink] 02/10: [FLINK-17547][task][hotfix] Extract
NonSpanningWrapper from SpillingAdaptiveSpanningRecordDeserializer (static
inner class) As it is, no logical changes.
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 39f5f1b0f09c37400ba113fdf33f90a832de5f0d
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed May 6 17:54:05 2020 +0200
[FLINK-17547][task][hotfix] Extract NonSpanningWrapper from
SpillingAdaptiveSpanningRecordDeserializer (static inner class)
As it is, no logical changes.
---
.../api/serialization/NonSpanningWrapper.java | 296 +++++++++++++++++++++
...SpillingAdaptiveSpanningRecordDeserializer.java | 271 -------------------
2 files changed, 296 insertions(+), 271 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
new file mode 100644
index 0000000..bab50fa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.Optional;
+
+final class NonSpanningWrapper implements DataInputView {
+
+ MemorySegment segment;
+
+ private int limit;
+
+ int position;
+
+ private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
+ private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
+
+ int remaining() {
+ return this.limit - this.position;
+ }
+
+ void clear() {
+ this.segment = null;
+ this.limit = 0;
+ this.position = 0;
+ }
+
+ void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
+ this.segment = seg;
+ this.position = position;
+ this.limit = leftOverLimit;
+ }
+
+ Optional<MemorySegment> getUnconsumedSegment() {
+ if (remaining() == 0) {
+ return Optional.empty();
+ }
+ MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining());
+ segment.copyTo(position, target, 0, remaining());
+ return Optional.of(target);
+ }
+
+ // -------------------------------------------------------------------------------------------------------------
+ // DataInput specific methods
+ // -------------------------------------------------------------------------------------------------------------
+
+ @Override
+ public final void readFully(byte[] b) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ @Override
+ public final void readFully(byte[] b, int off, int len) throws IOException {
+ if (off < 0 || len < 0 || off + len > b.length) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ this.segment.get(this.position, b, off, len);
+ this.position += len;
+ }
+
+ @Override
+ public final boolean readBoolean() throws IOException {
+ return readByte() == 1;
+ }
+
+ @Override
+ public final byte readByte() throws IOException {
+ return this.segment.get(this.position++);
+ }
+
+ @Override
+ public final int readUnsignedByte() throws IOException {
+ return readByte() & 0xff;
+ }
+
+ @Override
+ public final short readShort() throws IOException {
+ final short v = this.segment.getShortBigEndian(this.position);
+ this.position += 2;
+ return v;
+ }
+
+ @Override
+ public final int readUnsignedShort() throws IOException {
+ final int v = this.segment.getShortBigEndian(this.position) & 0xffff;
+ this.position += 2;
+ return v;
+ }
+
+ @Override
+ public final char readChar() throws IOException {
+ final char v = this.segment.getCharBigEndian(this.position);
+ this.position += 2;
+ return v;
+ }
+
+ @Override
+ public final int readInt() throws IOException {
+ final int v = this.segment.getIntBigEndian(this.position);
+ this.position += 4;
+ return v;
+ }
+
+ @Override
+ public final long readLong() throws IOException {
+ final long v = this.segment.getLongBigEndian(this.position);
+ this.position += 8;
+ return v;
+ }
+
+ @Override
+ public final float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ @Override
+ public final double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ @Override
+ public final String readLine() throws IOException {
+ final StringBuilder bld = new StringBuilder(32);
+
+ try {
+ int b;
+ while ((b = readUnsignedByte()) != '\n') {
+ if (b != '\r') {
+ bld.append((char) b);
+ }
+ }
+ }
+ catch (EOFException ignored) {}
+
+ if (bld.length() == 0) {
+ return null;
+ }
+
+ // trim a trailing carriage return
+ int len = bld.length();
+ if (len > 0 && bld.charAt(len - 1) == '\r') {
+ bld.setLength(len - 1);
+ }
+ return bld.toString();
+ }
+
+ @Override
+ public final String readUTF() throws IOException {
+ final int utflen = readUnsignedShort();
+
+ final byte[] bytearr;
+ final char[] chararr;
+
+ if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
+ bytearr = new byte[utflen];
+ this.utfByteBuffer = bytearr;
+ } else {
+ bytearr = this.utfByteBuffer;
+ }
+ if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
+ chararr = new char[utflen];
+ this.utfCharBuffer = chararr;
+ } else {
+ chararr = this.utfCharBuffer;
+ }
+
+ int c, char2, char3;
+ int count = 0;
+ int chararrCount = 0;
+
+ readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ count++;
+ chararr[chararrCount++] = (char) c;
+ }
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ count++;
+ chararr[chararrCount++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+ break;
+ case 14:
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 2];
+ char3 = (int) bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+ }
+ chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
+ break;
+ default:
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararrCount);
+ }
+
+ @Override
+ public final int skipBytes(int n) throws IOException {
+ if (n < 0) {
+ throw new IllegalArgumentException();
+ }
+
+ int toSkip = Math.min(n, remaining());
+ this.position += toSkip;
+ return toSkip;
+ }
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ int skippedBytes = skipBytes(numBytes);
+
+ if (skippedBytes < numBytes){
+ throw new EOFException("Could not skip " + numBytes + " bytes.");
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (b == null){
+ throw new NullPointerException("Byte array b cannot be null.");
+ }
+
+ if (off < 0){
+ throw new IllegalArgumentException("The offset off cannot be negative.");
+ }
+
+ if (len < 0){
+ throw new IllegalArgumentException("The length len cannot be negative.");
+ }
+
+ int toRead = Math.min(len, remaining());
+ this.segment.get(this.position, b, off, toRead);
+ this.position += toRead;
+
+ return toRead;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 346bdfc..5003e78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -32,12 +32,10 @@ import org.apache.flink.util.FileUtils;
import org.apache.flink.util.StringUtils;
import java.io.BufferedInputStream;
-import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.io.UTFDataFormatException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
@@ -184,275 +182,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
// -----------------------------------------------------------------------------------------------------------------
- private static final class NonSpanningWrapper implements DataInputView {
-
- private MemorySegment segment;
-
- private int limit;
-
- private int position;
-
- private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
- private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
-
- int remaining() {
- return this.limit - this.position;
- }
-
- void clear() {
- this.segment = null;
- this.limit = 0;
- this.position = 0;
- }
-
- void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
- this.segment = seg;
- this.position = position;
- this.limit = leftOverLimit;
- }
-
- Optional<MemorySegment> getUnconsumedSegment() {
- if (remaining() == 0) {
- return Optional.empty();
- }
- MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining());
- segment.copyTo(position, target, 0, remaining());
- return Optional.of(target);
- }
-
- // -------------------------------------------------------------------------------------------------------------
- // DataInput specific methods
- // -------------------------------------------------------------------------------------------------------------
-
- @Override
- public final void readFully(byte[] b) throws IOException {
- readFully(b, 0, b.length);
- }
-
- @Override
- public final void readFully(byte[] b, int off, int len) throws IOException {
- if (off < 0 || len < 0 || off + len > b.length) {
- throw new IndexOutOfBoundsException();
- }
-
- this.segment.get(this.position, b, off, len);
- this.position += len;
- }
-
- @Override
- public final boolean readBoolean() throws IOException {
- return readByte() == 1;
- }
-
- @Override
- public final byte readByte() throws IOException {
- return this.segment.get(this.position++);
- }
-
- @Override
- public final int readUnsignedByte() throws IOException {
- return readByte() & 0xff;
- }
-
- @Override
- public final short readShort() throws IOException {
- final short v = this.segment.getShortBigEndian(this.position);
- this.position += 2;
- return v;
- }
-
- @Override
- public final int readUnsignedShort() throws IOException {
- final int v = this.segment.getShortBigEndian(this.position) & 0xffff;
- this.position += 2;
- return v;
- }
-
- @Override
- public final char readChar() throws IOException {
- final char v = this.segment.getCharBigEndian(this.position);
- this.position += 2;
- return v;
- }
-
- @Override
- public final int readInt() throws IOException {
- final int v = this.segment.getIntBigEndian(this.position);
- this.position += 4;
- return v;
- }
-
- @Override
- public final long readLong() throws IOException {
- final long v = this.segment.getLongBigEndian(this.position);
- this.position += 8;
- return v;
- }
-
- @Override
- public final float readFloat() throws IOException {
- return Float.intBitsToFloat(readInt());
- }
-
- @Override
- public final double readDouble() throws IOException {
- return Double.longBitsToDouble(readLong());
- }
-
- @Override
- public final String readLine() throws IOException {
- final StringBuilder bld = new StringBuilder(32);
-
- try {
- int b;
- while ((b = readUnsignedByte()) != '\n') {
- if (b != '\r') {
- bld.append((char) b);
- }
- }
- }
- catch (EOFException ignored) {}
-
- if (bld.length() == 0) {
- return null;
- }
-
- // trim a trailing carriage return
- int len = bld.length();
- if (len > 0 && bld.charAt(len - 1) == '\r') {
- bld.setLength(len - 1);
- }
- return bld.toString();
- }
-
- @Override
- public final String readUTF() throws IOException {
- final int utflen = readUnsignedShort();
-
- final byte[] bytearr;
- final char[] chararr;
-
- if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
- bytearr = new byte[utflen];
- this.utfByteBuffer = bytearr;
- } else {
- bytearr = this.utfByteBuffer;
- }
- if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
- chararr = new char[utflen];
- this.utfCharBuffer = chararr;
- } else {
- chararr = this.utfCharBuffer;
- }
-
- int c, char2, char3;
- int count = 0;
- int chararrCount = 0;
-
- readFully(bytearr, 0, utflen);
-
- while (count < utflen) {
- c = (int) bytearr[count] & 0xff;
- if (c > 127) {
- break;
- }
- count++;
- chararr[chararrCount++] = (char) c;
- }
-
- while (count < utflen) {
- c = (int) bytearr[count] & 0xff;
- switch (c >> 4) {
- case 0:
- case 1:
- case 2:
- case 3:
- case 4:
- case 5:
- case 6:
- case 7:
- count++;
- chararr[chararrCount++] = (char) c;
- break;
- case 12:
- case 13:
- count += 2;
- if (count > utflen) {
- throw new UTFDataFormatException("malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80) {
- throw new UTFDataFormatException("malformed input around byte " + count);
- }
- chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
- break;
- case 14:
- count += 3;
- if (count > utflen) {
- throw new UTFDataFormatException("malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 2];
- char3 = (int) bytearr[count - 1];
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
- throw new UTFDataFormatException("malformed input around byte " + (count - 1));
- }
- chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
- break;
- default:
- throw new UTFDataFormatException("malformed input around byte " + count);
- }
- }
- // The number of chars produced may be less than utflen
- return new String(chararr, 0, chararrCount);
- }
-
- @Override
- public final int skipBytes(int n) throws IOException {
- if (n < 0) {
- throw new IllegalArgumentException();
- }
-
- int toSkip = Math.min(n, remaining());
- this.position += toSkip;
- return toSkip;
- }
-
- @Override
- public void skipBytesToRead(int numBytes) throws IOException {
- int skippedBytes = skipBytes(numBytes);
-
- if (skippedBytes < numBytes){
- throw new EOFException("Could not skip " + numBytes + " bytes.");
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- if (b == null){
- throw new NullPointerException("Byte array b cannot be null.");
- }
-
- if (off < 0){
- throw new IllegalArgumentException("The offset off cannot be negative.");
- }
-
- if (len < 0){
- throw new IllegalArgumentException("The length len cannot be negative.");
- }
-
- int toRead = Math.min(len, remaining());
- this.segment.get(this.position, b, off, toRead);
- this.position += toRead;
-
- return toRead;
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
- }
-
// -----------------------------------------------------------------------------------------------------------------
private static final class SpanningWrapper {