You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/20 14:59:17 UTC
flink git commit: [FLINK-6652] [core] Fix handling of delimiters
split by buffers in DelimitedInputFormat
Repository: flink
Updated Branches:
refs/heads/master f24a499b3 -> be662bf7e
[FLINK-6652] [core] Fix handling of delimiters split by buffers in DelimitedInputFormat
This closes #4088.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be662bf7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be662bf7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be662bf7
Branch: refs/heads/master
Commit: be662bf7ebcefb289988a24392104c3385029568
Parents: f24a499
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Jun 7 23:01:06 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jun 20 16:58:51 2017 +0200
----------------------------------------------------------------------
.../api/common/io/DelimitedInputFormat.java | 99 +++++++++++++-------
.../api/common/io/DelimitedInputFormatTest.java | 28 +++++-
2 files changed, 93 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/be662bf7/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index e20f646..4d715e7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -266,10 +266,10 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
}
public void setBufferSize(int bufferSize) {
- if (bufferSize < 1) {
- throw new IllegalArgumentException("Buffer size must be at least 1.");
+ if (bufferSize < 2) {
+ throw new IllegalArgumentException("Buffer size must be at least 2.");
}
-
+
this.bufferSize = bufferSize;
}
@@ -487,13 +487,17 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
this.end = true;
}
} else {
- fillBuffer();
+ fillBuffer(0);
}
}
private void initBuffers() {
this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
+ if (this.bufferSize <= this.delimiter.length) {
+ throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
+ }
+
if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
this.readBuffer = new byte[this.bufferSize];
}
@@ -548,13 +552,30 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
int countInWrapBuffer = 0;
- /* position of matching positions in the delimiter byte array */
- int i = 0;
+ // position of matching positions in the delimiter byte array
+ int delimPos = 0;
while (true) {
if (this.readPos >= this.limit) {
- if (!fillBuffer()) {
- if (countInWrapBuffer > 0) {
+ // readBuffer is completely consumed. Fill it again but keep partially read delimiter bytes.
+ if (!fillBuffer(delimPos)) {
+ int countInReadBuffer = delimPos;
+ if (countInWrapBuffer + countInReadBuffer > 0) {
+ // we have bytes left to emit
+ if (countInReadBuffer > 0) {
+ // we have bytes left in the readBuffer. Move them into the wrapBuffer
+ if (this.wrapBuffer.length - countInWrapBuffer < countInReadBuffer) {
+ // reallocate
+ byte[] tmp = new byte[countInWrapBuffer + countInReadBuffer];
+ System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
+ this.wrapBuffer = tmp;
+ }
+
+ // copy readBuffer bytes to wrapBuffer
+ System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, countInReadBuffer);
+ countInWrapBuffer += countInReadBuffer;
+ }
+
this.offset += countInWrapBuffer;
setResult(this.wrapBuffer, 0, countInWrapBuffer);
return true;
@@ -564,30 +585,30 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
}
}
- int startPos = this.readPos;
+ int startPos = this.readPos - delimPos;
int count;
// Search for next occurence of delimiter in read buffer.
- while (this.readPos < this.limit && i < this.delimiter.length) {
- if ((this.readBuffer[this.readPos]) == this.delimiter[i]) {
+ while (this.readPos < this.limit && delimPos < this.delimiter.length) {
+ if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) {
// Found the expected delimiter character. Continue looking for the next character of delimiter.
- i++;
+ delimPos++;
} else {
// Delimiter does not match.
// We have to reset the read position to the character after the first matching character
// and search for the whole delimiter again.
- readPos -= i;
- i = 0;
+ readPos -= delimPos;
+ delimPos = 0;
}
readPos++;
}
// check why we dropped out
- if (i == this.delimiter.length) {
- // delimiter found
- int totalBytesRead = this.readPos - startPos;
- this.offset += countInWrapBuffer + totalBytesRead;
- count = totalBytesRead - this.delimiter.length;
+ if (delimPos == this.delimiter.length) {
+ // we found a delimiter
+ int readBufferBytesRead = this.readPos - startPos;
+ this.offset += countInWrapBuffer + readBufferBytesRead;
+ count = readBufferBytesRead - this.delimiter.length;
// copy to byte array
if (countInWrapBuffer > 0) {
@@ -607,6 +628,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
return true;
}
} else {
+ // we reached the end of the readBuffer
count = this.limit - startPos;
// check against the maximum record length
@@ -615,16 +637,23 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
this.lineLengthLimit + ").");
}
- // buffer exhausted
- if (this.wrapBuffer.length - countInWrapBuffer < count) {
+ // Compute number of bytes to move to wrapBuffer
+ // Chars of partially read delimiter must remain in the readBuffer. We might need to go back.
+ int bytesToMove = count - delimPos;
+ // ensure wrapBuffer is large enough
+ if (this.wrapBuffer.length - countInWrapBuffer < bytesToMove) {
// reallocate
- byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + count)];
+ byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + bytesToMove)];
System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
this.wrapBuffer = tmp;
}
- System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, count);
- countInWrapBuffer += count;
+ // copy readBuffer to wrapBuffer (except delimiter chars)
+ System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, bytesToMove);
+ countInWrapBuffer += bytesToMove;
+ // move delimiter chars to the beginning of the readBuffer
+ System.arraycopy(this.readBuffer, this.readPos - delimPos, this.readBuffer, 0, delimPos);
+
}
}
}
@@ -635,16 +664,20 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
this.currLen = len;
}
- private boolean fillBuffer() throws IOException {
+ /**
+ * Fills the read buffer with bytes read from the file starting from an offset.
+ */
+ private boolean fillBuffer(int offset) throws IOException {
+ int maxReadLength = this.readBuffer.length - offset;
// special case for reading the whole split.
if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
- int read = this.stream.read(this.readBuffer, 0, readBuffer.length);
+ int read = this.stream.read(this.readBuffer, offset, maxReadLength);
if (read == -1) {
this.stream.close();
this.stream = null;
return false;
} else {
- this.readPos = 0;
+ this.readPos = offset;
this.limit = read;
return true;
}
@@ -654,7 +687,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
int toRead;
if (this.splitLength > 0) {
// if we have more data, read that
- toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength;
+ toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
}
else {
// if we have exhausted our split, we need to complete the current record, or read one
@@ -662,11 +695,11 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
// the reason is that the next split will skip over the beginning until it finds the first
// delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
// previous split.
- toRead = this.readBuffer.length;
+ toRead = maxReadLength;
this.overLimit = true;
}
- int read = this.stream.read(this.readBuffer, 0, toRead);
+ int read = this.stream.read(this.readBuffer, offset, toRead);
if (read == -1) {
this.stream.close();
@@ -674,8 +707,8 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
return false;
} else {
this.splitLength -= read;
- this.readPos = 0;
- this.limit = read;
+ this.readPos = offset; // position from where to start reading
+ this.limit = read + offset; // number of valid bytes in the read buffer
return true;
}
}
@@ -726,7 +759,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
this.stream.seek(this.offset);
if (split.getLength() == -1) {
// this is the case for unsplittable files
- fillBuffer();
+ fillBuffer(0);
} else {
this.splitLength = this.splitStart + split.getLength() - this.offset;
if (splitLength <= 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/be662bf7/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 2ff5ee7..e2df391 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -45,7 +45,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
public class DelimitedInputFormatTest {
@@ -402,6 +401,33 @@ public class DelimitedInputFormatTest {
assertEquals(Arrays.asList(myString.split("\n")), result);
}
+ @Test
+ public void testDelimiterOnBufferBoundary() throws IOException {
+
+ String[] records = new String[]{"1234567890<DEL?NO!>1234567890", "1234567890<DEL?NO!>1234567890", "<DEL?NO!>"};
+ String delimiter = "<DELIM>";
+ String fileContent = StringUtils.join(records, delimiter);
+
+
+ final FileInputSplit split = createTempFile(fileContent);
+ final Configuration parameters = new Configuration();
+
+ format.setBufferSize(12);
+ format.setDelimiter(delimiter);
+ format.configure(parameters);
+ format.open(split);
+
+ for (String record : records) {
+ String value = format.nextRecord(null);
+ assertEquals(record, value);
+ }
+
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
+
+ format.close();
+ }
+
static FileInputSplit createTempFile(String contents) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();