You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/15 22:59:03 UTC

flink git commit: [FLINK-5771] [core] Fix multi-char delimiter detection in DelimitedInputFormat.

Repository: flink
Updated Branches:
  refs/heads/release-1.2 1037ace42 -> 3b4f6cf8c


[FLINK-5771] [core] Fix multi-char delimiter detection in DelimitedInputFormat.

- Add a test case to validate correct delimiter detection.
- Remove a couple of try-catch blocks from existing tests.

This closes #3316.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b4f6cf8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b4f6cf8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b4f6cf8

Branch: refs/heads/release-1.2
Commit: 3b4f6cf8c8283c221c8ab58f544cfe01c092fe6b
Parents: 1037ace
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Feb 14 22:02:26 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Feb 15 23:58:34 2017 +0100

----------------------------------------------------------------------
 .../api/common/io/DelimitedInputFormat.java     |  11 +-
 .../api/common/io/DelimitedInputFormatTest.java | 313 +++++++++----------
 2 files changed, 164 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b4f6cf8/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 5c8dfc1..a83d45f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -567,17 +567,24 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 			int startPos = this.readPos;
 			int count;
 
+			// Search for next occurence of delimiter in read buffer.
 			while (this.readPos < this.limit && i < this.delimiter.length) {
-				if ((this.readBuffer[this.readPos++]) == this.delimiter[i]) {
+				if ((this.readBuffer[this.readPos]) == this.delimiter[i]) {
+					// Found the expected delimiter character. Continue looking for the next character of delimiter.
 					i++;
 				} else {
+					// Delimiter does not match.
+					// We have to reset the read position to the character after the first matching character
+					//   and search for the whole delimiter again.
+					readPos -= i;
 					i = 0;
 				}
+				readPos++;
 			}
 
 			// check why we dropped out
 			if (i == this.delimiter.length) {
-				// line end
+				// delimiter found
 				int totalBytesRead = this.readPos - startPos;
 				this.offset += countInWrapBuffer + totalBytesRead;
 				count = totalBytesRead - this.delimiter.length;

http://git-wip-us.apache.org/repos/asf/flink/blob/3b4f6cf8/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 219365a..7ce0a2e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -173,32 +173,53 @@ public class DelimitedInputFormatTest {
 	}
 	
 	@Test
-	public void testReadCustomDelimiter() {
-		try {
-			final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
-			final FileInputSplit split = createTempFile(myString);
-			
-			final Configuration parameters = new Configuration();
-			
-			format.setDelimiter("$$$");
-			format.configure(parameters);
-			format.open(split);
-	
-			String first = format.nextRecord(null);
-			assertNotNull(first);
-			assertEquals("my key|my val", first);
-
-			String second = format.nextRecord(null);
-			assertNotNull(second);
-			assertEquals("my key2\n$$ctd.$$|my value2", second);
-			
-			assertNull(format.nextRecord(null));
-			assertTrue(format.reachedEnd());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	public void testReadCustomDelimiter() throws IOException {
+		final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
+		final FileInputSplit split = createTempFile(myString);
+
+		final Configuration parameters = new Configuration();
+
+		format.setDelimiter("$$$");
+		format.configure(parameters);
+		format.open(split);
+
+		String first = format.nextRecord(null);
+		assertNotNull(first);
+		assertEquals("my key|my val", first);
+
+		String second = format.nextRecord(null);
+		assertNotNull(second);
+		assertEquals("my key2\n$$ctd.$$|my value2", second);
+
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void testMultiCharDelimiter() throws IOException {
+		final String myString = "www112xx1123yyy11123zzzzz1123";
+		final FileInputSplit split = createTempFile(myString);
+
+		final Configuration parameters = new Configuration();
+
+		format.setDelimiter("1123");
+		format.configure(parameters);
+		format.open(split);
+
+		String first = format.nextRecord(null);
+		assertNotNull(first);
+		assertEquals("www112xx", first);
+
+		String second = format.nextRecord(null);
+		assertNotNull(second);
+		assertEquals("yyy1", second);
+
+		String third = format.nextRecord(null);
+		assertNotNull(third);
+		assertEquals("zzzzz", third);
+
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
 	}
 
 	@Test
@@ -244,164 +265,140 @@ public class DelimitedInputFormatTest {
 	 * Tests that the records are read correctly when the split boundary is in the middle of a record.
 	 */
 	@Test
-	public void testReadOverSplitBoundariesUnaligned() {
-		try {
-			final String myString = "value1\nvalue2\nvalue3";
-			final FileInputSplit split = createTempFile(myString);
-			
-			FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
-			FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
-
-			final Configuration parameters = new Configuration();
-			
-			format.configure(parameters);
-			format.open(split1);
-			
-			assertEquals("value1", format.nextRecord(null));
-			assertEquals("value2", format.nextRecord(null));
-			assertNull(format.nextRecord(null));
-			assertTrue(format.reachedEnd());
-			
-			format.close();
-			format.open(split2);
+	public void testReadOverSplitBoundariesUnaligned() throws IOException {
+		final String myString = "value1\nvalue2\nvalue3";
+		final FileInputSplit split = createTempFile(myString);
 
-			assertEquals("value3", format.nextRecord(null));
-			assertNull(format.nextRecord(null));
-			assertTrue(format.reachedEnd());
-			
-			format.close();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+		FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+
+		final Configuration parameters = new Configuration();
+
+		format.configure(parameters);
+		format.open(split1);
+
+		assertEquals("value1", format.nextRecord(null));
+		assertEquals("value2", format.nextRecord(null));
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+
+		format.close();
+		format.open(split2);
+
+		assertEquals("value3", format.nextRecord(null));
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+
+		format.close();
 	}
 
 	/**
 	 * Tests that the correct number of records is read when the split boundary is exact at the record boundary.
 	 */
 	@Test
-	public void testReadWithBufferSizeIsMultple() {
-		try {
-			final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
-			final FileInputSplit split = createTempFile(myString);
-
-			FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
-			FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+	public void testReadWithBufferSizeIsMultiple() throws IOException {
+		final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
+		final FileInputSplit split = createTempFile(myString);
 
-			final Configuration parameters = new Configuration();
+		FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+		FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
 
-			format.setBufferSize(2 * ((int) split1.getLength()));
-			format.configure(parameters);
+		final Configuration parameters = new Configuration();
 
-			String next;
-			int count = 0;
+		format.setBufferSize(2 * ((int) split1.getLength()));
+		format.configure(parameters);
 
-			// read split 1
-			format.open(split1);
-			while ((next = format.nextRecord(null)) != null) {
-				assertEquals(7, next.length());
-				count++;
-			}
-			assertNull(format.nextRecord(null));
-			assertTrue(format.reachedEnd());
-			format.close();
-			
-			// this one must have read one too many, because the next split will skipp the trailing remainder
-			// which happens to be one full record
-			assertEquals(3, count);
-
-			// read split 2
-			format.open(split2);
-			while ((next = format.nextRecord(null)) != null) {
-				assertEquals(7, next.length());
-				count++;
-			}
-			format.close();
+		String next;
+		int count = 0;
 
-			assertEquals(4, count);
+		// read split 1
+		format.open(split1);
+		while ((next = format.nextRecord(null)) != null) {
+			assertEquals(7, next.length());
+			count++;
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+		format.close();
+
+		// this one must have read one too many, because the next split will skipp the trailing remainder
+		// which happens to be one full record
+		assertEquals(3, count);
+
+		// read split 2
+		format.open(split2);
+		while ((next = format.nextRecord(null)) != null) {
+			assertEquals(7, next.length());
+			count++;
 		}
+		format.close();
+
+		assertEquals(4, count);
 	}
 
 	@Test
-	public void testReadExactlyBufferSize() {
-		try {
-			final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
-			
-			final FileInputSplit split = createTempFile(myString);
-			final Configuration parameters = new Configuration();
-			
-			format.setBufferSize((int) split.getLength());
-			format.configure(parameters);
-			format.open(split);
+	public void testReadExactlyBufferSize() throws IOException {
+		final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
 
-			String next;
-			int count = 0;
-			while ((next = format.nextRecord(null)) != null) {
-				assertEquals(7, next.length());
-				count++;
-			}
-			assertNull(format.nextRecord(null));
-			assertTrue(format.reachedEnd());
+		final FileInputSplit split = createTempFile(myString);
+		final Configuration parameters = new Configuration();
 
-			format.close();
-			
-			assertEquals(4, count);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+		format.setBufferSize((int) split.getLength());
+		format.configure(parameters);
+		format.open(split);
+
+		String next;
+		int count = 0;
+		while ((next = format.nextRecord(null)) != null) {
+			assertEquals(7, next.length());
+			count++;
 		}
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+
+		format.close();
+
+		assertEquals(4, count);
 	}
 
 	@Test
-	public void testReadRecordsLargerThanBuffer() {
-		try {
-			final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" +
-									"bbbbbbbbbbbbbbbbbbbbbbbbb\n" +
-									"ccccccccccccccccccc\n" +
-									"ddddddddddddddddddddddddddddddddddd\n";
-
-			final FileInputSplit split = createTempFile(myString);
-			FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
-			FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
-			
-			final Configuration parameters = new Configuration();
-
-			format.setBufferSize(8);
-			format.configure(parameters);
-
-			String next;
-			List<String> result = new ArrayList<String>();
-			
-			
-			format.open(split1);
-			while ((next = format.nextRecord(null)) != null) {
-				result.add(next);
-			}
-			assertNull(format.nextRecord(null));
-			assertTrue(format.reachedEnd());
-			format.close();
+	public void testReadRecordsLargerThanBuffer() throws IOException {
+		final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" +
+								"bbbbbbbbbbbbbbbbbbbbbbbbb\n" +
+								"ccccccccccccccccccc\n" +
+								"ddddddddddddddddddddddddddddddddddd\n";
 
-			format.open(split2);
-			while ((next = format.nextRecord(null)) != null) {
-				result.add(next);
-			}
-			assertNull(format.nextRecord(null));
-			assertTrue(format.reachedEnd());
-			format.close();
-			
-			assertEquals(4, result.size());
-			assertEquals(Arrays.asList(myString.split("\n")), result);
+		final FileInputSplit split = createTempFile(myString);
+		FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+		FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+
+		final Configuration parameters = new Configuration();
+
+		format.setBufferSize(8);
+		format.configure(parameters);
+
+		String next;
+		List<String> result = new ArrayList<String>();
+
+
+		format.open(split1);
+		while ((next = format.nextRecord(null)) != null) {
+			result.add(next);
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+		format.close();
+
+		format.open(split2);
+		while ((next = format.nextRecord(null)) != null) {
+			result.add(next);
 		}
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+		format.close();
+
+		assertEquals(4, result.size());
+		assertEquals(Arrays.asList(myString.split("\n")), result);
 	}
 
 	static FileInputSplit createTempFile(String contents) throws IOException {