You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/15 22:59:03 UTC
flink git commit: [FLINK-5771] [core] Fix multi-char delimiter
detection in DelimitedInputFormat.
Repository: flink
Updated Branches:
refs/heads/release-1.2 1037ace42 -> 3b4f6cf8c
[FLINK-5771] [core] Fix multi-char delimiter detection in DelimitedInputFormat.
- Add a test case to validate correct delimiter detection.
- Remove a couple of try-catch blocks from existing tests.
This closes #3316.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b4f6cf8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b4f6cf8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b4f6cf8
Branch: refs/heads/release-1.2
Commit: 3b4f6cf8c8283c221c8ab58f544cfe01c092fe6b
Parents: 1037ace
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Feb 14 22:02:26 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Feb 15 23:58:34 2017 +0100
----------------------------------------------------------------------
.../api/common/io/DelimitedInputFormat.java | 11 +-
.../api/common/io/DelimitedInputFormatTest.java | 313 +++++++++----------
2 files changed, 164 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3b4f6cf8/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 5c8dfc1..a83d45f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -567,17 +567,24 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
int startPos = this.readPos;
int count;
+ // Search for next occurence of delimiter in read buffer.
while (this.readPos < this.limit && i < this.delimiter.length) {
- if ((this.readBuffer[this.readPos++]) == this.delimiter[i]) {
+ if ((this.readBuffer[this.readPos]) == this.delimiter[i]) {
+ // Found the expected delimiter character. Continue looking for the next character of delimiter.
i++;
} else {
+ // Delimiter does not match.
+ // We have to reset the read position to the character after the first matching character
+ // and search for the whole delimiter again.
+ readPos -= i;
i = 0;
}
+ readPos++;
}
// check why we dropped out
if (i == this.delimiter.length) {
- // line end
+ // delimiter found
int totalBytesRead = this.readPos - startPos;
this.offset += countInWrapBuffer + totalBytesRead;
count = totalBytesRead - this.delimiter.length;
http://git-wip-us.apache.org/repos/asf/flink/blob/3b4f6cf8/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 219365a..7ce0a2e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -173,32 +173,53 @@ public class DelimitedInputFormatTest {
}
@Test
- public void testReadCustomDelimiter() {
- try {
- final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
- final FileInputSplit split = createTempFile(myString);
-
- final Configuration parameters = new Configuration();
-
- format.setDelimiter("$$$");
- format.configure(parameters);
- format.open(split);
-
- String first = format.nextRecord(null);
- assertNotNull(first);
- assertEquals("my key|my val", first);
-
- String second = format.nextRecord(null);
- assertNotNull(second);
- assertEquals("my key2\n$$ctd.$$|my value2", second);
-
- assertNull(format.nextRecord(null));
- assertTrue(format.reachedEnd());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ public void testReadCustomDelimiter() throws IOException {
+ final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
+ final FileInputSplit split = createTempFile(myString);
+
+ final Configuration parameters = new Configuration();
+
+ format.setDelimiter("$$$");
+ format.configure(parameters);
+ format.open(split);
+
+ String first = format.nextRecord(null);
+ assertNotNull(first);
+ assertEquals("my key|my val", first);
+
+ String second = format.nextRecord(null);
+ assertNotNull(second);
+ assertEquals("my key2\n$$ctd.$$|my value2", second);
+
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void testMultiCharDelimiter() throws IOException {
+ final String myString = "www112xx1123yyy11123zzzzz1123";
+ final FileInputSplit split = createTempFile(myString);
+
+ final Configuration parameters = new Configuration();
+
+ format.setDelimiter("1123");
+ format.configure(parameters);
+ format.open(split);
+
+ String first = format.nextRecord(null);
+ assertNotNull(first);
+ assertEquals("www112xx", first);
+
+ String second = format.nextRecord(null);
+ assertNotNull(second);
+ assertEquals("yyy1", second);
+
+ String third = format.nextRecord(null);
+ assertNotNull(third);
+ assertEquals("zzzzz", third);
+
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
}
@Test
@@ -244,164 +265,140 @@ public class DelimitedInputFormatTest {
* Tests that the records are read correctly when the split boundary is in the middle of a record.
*/
@Test
- public void testReadOverSplitBoundariesUnaligned() {
- try {
- final String myString = "value1\nvalue2\nvalue3";
- final FileInputSplit split = createTempFile(myString);
-
- FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
- FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
-
- final Configuration parameters = new Configuration();
-
- format.configure(parameters);
- format.open(split1);
-
- assertEquals("value1", format.nextRecord(null));
- assertEquals("value2", format.nextRecord(null));
- assertNull(format.nextRecord(null));
- assertTrue(format.reachedEnd());
-
- format.close();
- format.open(split2);
+ public void testReadOverSplitBoundariesUnaligned() throws IOException {
+ final String myString = "value1\nvalue2\nvalue3";
+ final FileInputSplit split = createTempFile(myString);
- assertEquals("value3", format.nextRecord(null));
- assertNull(format.nextRecord(null));
- assertTrue(format.reachedEnd());
-
- format.close();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+ FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+
+ final Configuration parameters = new Configuration();
+
+ format.configure(parameters);
+ format.open(split1);
+
+ assertEquals("value1", format.nextRecord(null));
+ assertEquals("value2", format.nextRecord(null));
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
+
+ format.close();
+ format.open(split2);
+
+ assertEquals("value3", format.nextRecord(null));
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
+
+ format.close();
}
/**
* Tests that the correct number of records is read when the split boundary is exact at the record boundary.
*/
@Test
- public void testReadWithBufferSizeIsMultple() {
- try {
- final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
- final FileInputSplit split = createTempFile(myString);
-
- FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
- FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+ public void testReadWithBufferSizeIsMultiple() throws IOException {
+ final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
+ final FileInputSplit split = createTempFile(myString);
- final Configuration parameters = new Configuration();
+ FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+ FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
- format.setBufferSize(2 * ((int) split1.getLength()));
- format.configure(parameters);
+ final Configuration parameters = new Configuration();
- String next;
- int count = 0;
+ format.setBufferSize(2 * ((int) split1.getLength()));
+ format.configure(parameters);
- // read split 1
- format.open(split1);
- while ((next = format.nextRecord(null)) != null) {
- assertEquals(7, next.length());
- count++;
- }
- assertNull(format.nextRecord(null));
- assertTrue(format.reachedEnd());
- format.close();
-
- // this one must have read one too many, because the next split will skipp the trailing remainder
- // which happens to be one full record
- assertEquals(3, count);
-
- // read split 2
- format.open(split2);
- while ((next = format.nextRecord(null)) != null) {
- assertEquals(7, next.length());
- count++;
- }
- format.close();
+ String next;
+ int count = 0;
- assertEquals(4, count);
+ // read split 1
+ format.open(split1);
+ while ((next = format.nextRecord(null)) != null) {
+ assertEquals(7, next.length());
+ count++;
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
+ format.close();
+
+ // this one must have read one too many, because the next split will skipp the trailing remainder
+ // which happens to be one full record
+ assertEquals(3, count);
+
+ // read split 2
+ format.open(split2);
+ while ((next = format.nextRecord(null)) != null) {
+ assertEquals(7, next.length());
+ count++;
}
+ format.close();
+
+ assertEquals(4, count);
}
@Test
- public void testReadExactlyBufferSize() {
- try {
- final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
-
- final FileInputSplit split = createTempFile(myString);
- final Configuration parameters = new Configuration();
-
- format.setBufferSize((int) split.getLength());
- format.configure(parameters);
- format.open(split);
+ public void testReadExactlyBufferSize() throws IOException {
+ final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
- String next;
- int count = 0;
- while ((next = format.nextRecord(null)) != null) {
- assertEquals(7, next.length());
- count++;
- }
- assertNull(format.nextRecord(null));
- assertTrue(format.reachedEnd());
+ final FileInputSplit split = createTempFile(myString);
+ final Configuration parameters = new Configuration();
- format.close();
-
- assertEquals(4, count);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ format.setBufferSize((int) split.getLength());
+ format.configure(parameters);
+ format.open(split);
+
+ String next;
+ int count = 0;
+ while ((next = format.nextRecord(null)) != null) {
+ assertEquals(7, next.length());
+ count++;
}
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
+
+ format.close();
+
+ assertEquals(4, count);
}
@Test
- public void testReadRecordsLargerThanBuffer() {
- try {
- final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" +
- "bbbbbbbbbbbbbbbbbbbbbbbbb\n" +
- "ccccccccccccccccccc\n" +
- "ddddddddddddddddddddddddddddddddddd\n";
-
- final FileInputSplit split = createTempFile(myString);
- FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
- FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
-
- final Configuration parameters = new Configuration();
-
- format.setBufferSize(8);
- format.configure(parameters);
-
- String next;
- List<String> result = new ArrayList<String>();
-
-
- format.open(split1);
- while ((next = format.nextRecord(null)) != null) {
- result.add(next);
- }
- assertNull(format.nextRecord(null));
- assertTrue(format.reachedEnd());
- format.close();
+ public void testReadRecordsLargerThanBuffer() throws IOException {
+ final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" +
+ "bbbbbbbbbbbbbbbbbbbbbbbbb\n" +
+ "ccccccccccccccccccc\n" +
+ "ddddddddddddddddddddddddddddddddddd\n";
- format.open(split2);
- while ((next = format.nextRecord(null)) != null) {
- result.add(next);
- }
- assertNull(format.nextRecord(null));
- assertTrue(format.reachedEnd());
- format.close();
-
- assertEquals(4, result.size());
- assertEquals(Arrays.asList(myString.split("\n")), result);
+ final FileInputSplit split = createTempFile(myString);
+ FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+ FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+
+ final Configuration parameters = new Configuration();
+
+ format.setBufferSize(8);
+ format.configure(parameters);
+
+ String next;
+ List<String> result = new ArrayList<String>();
+
+
+ format.open(split1);
+ while ((next = format.nextRecord(null)) != null) {
+ result.add(next);
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
+ format.close();
+
+ format.open(split2);
+ while ((next = format.nextRecord(null)) != null) {
+ result.add(next);
}
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
+ format.close();
+
+ assertEquals(4, result.size());
+ assertEquals(Arrays.asList(myString.split("\n")), result);
}
static FileInputSplit createTempFile(String contents) throws IOException {