You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/20 14:59:17 UTC

flink git commit: [FLINK-6652] [core] Fix handling of delimiters split by buffers in DelimitedInputFormat

Repository: flink
Updated Branches:
  refs/heads/master f24a499b3 -> be662bf7e


[FLINK-6652] [core] Fix handling of delimiters split by buffers in DelimitedInputFormat

This closes #4088.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be662bf7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be662bf7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be662bf7

Branch: refs/heads/master
Commit: be662bf7ebcefb289988a24392104c3385029568
Parents: f24a499
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Jun 7 23:01:06 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jun 20 16:58:51 2017 +0200

----------------------------------------------------------------------
 .../api/common/io/DelimitedInputFormat.java     | 99 +++++++++++++-------
 .../api/common/io/DelimitedInputFormatTest.java | 28 +++++-
 2 files changed, 93 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be662bf7/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index e20f646..4d715e7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -266,10 +266,10 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 	}
 	
 	public void setBufferSize(int bufferSize) {
-		if (bufferSize < 1) {
-			throw new IllegalArgumentException("Buffer size must be at least 1.");
+		if (bufferSize < 2) {
+			throw new IllegalArgumentException("Buffer size must be at least 2.");
 		}
-		
+
 		this.bufferSize = bufferSize;
 	}
 	
@@ -487,13 +487,17 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 				this.end = true;
 			}
 		} else {
-			fillBuffer();
+			fillBuffer(0);
 		}
 	}
 
 	private void initBuffers() {
 		this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
 
+		if (this.bufferSize <= this.delimiter.length) {
+			throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
+		}
+
 		if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
 			this.readBuffer = new byte[this.bufferSize];
 		}
@@ -548,13 +552,30 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 
 		int countInWrapBuffer = 0;
 
-		/* position of matching positions in the delimiter byte array */
-		int i = 0;
+		// position of matching positions in the delimiter byte array
+		int delimPos = 0;
 
 		while (true) {
 			if (this.readPos >= this.limit) {
-				if (!fillBuffer()) {
-					if (countInWrapBuffer > 0) {
+				// readBuffer is completely consumed. Fill it again but keep partially read delimiter bytes.
+				if (!fillBuffer(delimPos)) {
+					int countInReadBuffer = delimPos;
+					if (countInWrapBuffer + countInReadBuffer > 0) {
+						// we have bytes left to emit
+						if (countInReadBuffer > 0) {
+							// we have bytes left in the readBuffer. Move them into the wrapBuffer
+							if (this.wrapBuffer.length - countInWrapBuffer < countInReadBuffer) {
+								// reallocate
+								byte[] tmp = new byte[countInWrapBuffer + countInReadBuffer];
+								System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
+								this.wrapBuffer = tmp;
+							}
+
+							// copy readBuffer bytes to wrapBuffer
+							System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, countInReadBuffer);
+							countInWrapBuffer += countInReadBuffer;
+						}
+
 						this.offset += countInWrapBuffer;
 						setResult(this.wrapBuffer, 0, countInWrapBuffer);
 						return true;
@@ -564,30 +585,30 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 				}
 			}
 
-			int startPos = this.readPos;
+			int startPos = this.readPos - delimPos;
 			int count;
 
 			// Search for next occurence of delimiter in read buffer.
-			while (this.readPos < this.limit && i < this.delimiter.length) {
-				if ((this.readBuffer[this.readPos]) == this.delimiter[i]) {
+			while (this.readPos < this.limit && delimPos < this.delimiter.length) {
+				if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) {
 					// Found the expected delimiter character. Continue looking for the next character of delimiter.
-					i++;
+					delimPos++;
 				} else {
 					// Delimiter does not match.
 					// We have to reset the read position to the character after the first matching character
 					//   and search for the whole delimiter again.
-					readPos -= i;
-					i = 0;
+					readPos -= delimPos;
+					delimPos = 0;
 				}
 				readPos++;
 			}
 
 			// check why we dropped out
-			if (i == this.delimiter.length) {
-				// delimiter found
-				int totalBytesRead = this.readPos - startPos;
-				this.offset += countInWrapBuffer + totalBytesRead;
-				count = totalBytesRead - this.delimiter.length;
+			if (delimPos == this.delimiter.length) {
+				// we found a delimiter
+				int readBufferBytesRead = this.readPos - startPos;
+				this.offset += countInWrapBuffer + readBufferBytesRead;
+				count = readBufferBytesRead - this.delimiter.length;
 
 				// copy to byte array
 				if (countInWrapBuffer > 0) {
@@ -607,6 +628,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 					return true;
 				}
 			} else {
+				// we reached the end of the readBuffer
 				count = this.limit - startPos;
 				
 				// check against the maximum record length
@@ -615,16 +637,23 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 							this.lineLengthLimit + ").");
 				}
 
-				// buffer exhausted
-				if (this.wrapBuffer.length - countInWrapBuffer < count) {
+				// Compute number of bytes to move to wrapBuffer
+				// Chars of partially read delimiter must remain in the readBuffer. We might need to go back.
+				int bytesToMove = count - delimPos;
+				// ensure wrapBuffer is large enough
+				if (this.wrapBuffer.length - countInWrapBuffer < bytesToMove) {
 					// reallocate
-					byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + count)];
+					byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + bytesToMove)];
 					System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
 					this.wrapBuffer = tmp;
 				}
 
-				System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, count);
-				countInWrapBuffer += count;
+				// copy readBuffer to wrapBuffer (except delimiter chars)
+				System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, bytesToMove);
+				countInWrapBuffer += bytesToMove;
+				// move delimiter chars to the beginning of the readBuffer
+				System.arraycopy(this.readBuffer, this.readPos - delimPos, this.readBuffer, 0, delimPos);
+
 			}
 		}
 	}
@@ -635,16 +664,20 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 		this.currLen = len;
 	}
 
-	private boolean fillBuffer() throws IOException {
+	/**
+	 * Fills the read buffer with bytes read from the file starting from an offset.
+	 */
+	private boolean fillBuffer(int offset) throws IOException {
+		int maxReadLength = this.readBuffer.length - offset;
 		// special case for reading the whole split.
 		if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
-			int read = this.stream.read(this.readBuffer, 0, readBuffer.length);
+			int read = this.stream.read(this.readBuffer, offset, maxReadLength);
 			if (read == -1) {
 				this.stream.close();
 				this.stream = null;
 				return false;
 			} else {
-				this.readPos = 0;
+				this.readPos = offset;
 				this.limit = read;
 				return true;
 			}
@@ -654,7 +687,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 		int toRead;
 		if (this.splitLength > 0) {
 			// if we have more data, read that
-			toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength;
+			toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
 		}
 		else {
 			// if we have exhausted our split, we need to complete the current record, or read one
@@ -662,11 +695,11 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 			// the reason is that the next split will skip over the beginning until it finds the first
 			// delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
 			// previous split.
-			toRead = this.readBuffer.length;
+			toRead = maxReadLength;
 			this.overLimit = true;
 		}
 
-		int read = this.stream.read(this.readBuffer, 0, toRead);
+		int read = this.stream.read(this.readBuffer, offset, toRead);
 
 		if (read == -1) {
 			this.stream.close();
@@ -674,8 +707,8 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 			return false;
 		} else {
 			this.splitLength -= read;
-			this.readPos = 0;
-			this.limit = read;
+			this.readPos = offset; // position from where to start reading
+			this.limit = read + offset; // number of valid bytes in the read buffer
 			return true;
 		}
 	}
@@ -726,7 +759,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 			this.stream.seek(this.offset);
 			if (split.getLength() == -1) {
 				// this is the case for unsplittable files
-				fillBuffer();
+				fillBuffer(0);
 			} else {
 				this.splitLength = this.splitStart + split.getLength() - this.offset;
 				if (splitLength <= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/be662bf7/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 2ff5ee7..e2df391 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -45,7 +45,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 public class DelimitedInputFormatTest {
 	
@@ -402,6 +401,33 @@ public class DelimitedInputFormatTest {
 		assertEquals(Arrays.asList(myString.split("\n")), result);
 	}
 
+	@Test
+	public void testDelimiterOnBufferBoundary() throws IOException {
+
+		String[] records = new String[]{"1234567890<DEL?NO!>1234567890", "1234567890<DEL?NO!>1234567890", "<DEL?NO!>"};
+		String delimiter = "<DELIM>";
+		String fileContent = StringUtils.join(records, delimiter);
+
+
+		final FileInputSplit split = createTempFile(fileContent);
+		final Configuration parameters = new Configuration();
+
+		format.setBufferSize(12);
+		format.setDelimiter(delimiter);
+		format.configure(parameters);
+		format.open(split);
+
+		for (String record : records) {
+			String value = format.nextRecord(null);
+			assertEquals(record, value);
+		}
+
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+
+		format.close();
+	}
+
 	static FileInputSplit createTempFile(String contents) throws IOException {
 		File tempFile = File.createTempFile("test_contents", "tmp");
 		tempFile.deleteOnExit();