You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 15:15:26 UTC
[2/3] flink git commit: [core] Minor cleanups in FileInputFormat and
DelimitedInputFormat
[core] Minor cleanups in FileInputFormat and DelimitedInputFormat
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efa62df8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efa62df8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efa62df8
Branch: refs/heads/master
Commit: efa62df879c608daa1750797e2a34ffa928592f9
Parents: 3d5d63f
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 12 20:26:20 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 13 15:14:41 2015 +0200
----------------------------------------------------------------------
.../api/common/io/DelimitedInputFormat.java | 9 +-
.../flink/api/common/io/FileInputFormat.java | 12 ++-
.../api/common/io/DelimitedInputFormatTest.java | 98 ++++++++++----------
.../api/common/io/FileInputFormatTest.java | 39 ++++++--
4 files changed, 91 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index d2b4e83..7fc42ab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.io;
import java.io.IOException;
@@ -40,8 +39,8 @@ import com.google.common.base.Charsets;
* Base implementation for input formats that split the input at a delimiter into records.
* The parsing of the record bytes into the record has to be implemented in the
* {@link #readRecord(Object, byte[], int, int)} method.
- * <p>
- * The default delimiter is the newline character {@code '\n'}.
+ *
+ * <p>The default delimiter is the newline character {@code '\n'}.</p>
*/
public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
@@ -495,7 +494,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
}
int startPos = this.readPos;
- int count = 0;
+ int count;
while (this.readPos < this.limit && i < this.delimiter.length) {
if ((this.readBuffer[this.readPos++]) == this.delimiter[i]) {
@@ -559,7 +558,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
private boolean fillBuffer() throws IOException {
// special case for reading the whole split.
- if(this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
+ if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
int read = this.stream.read(this.readBuffer, 0, readBuffer.length);
if (read == -1) {
this.stream.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 0584b96..c9c9ec1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -47,12 +47,12 @@ import org.apache.flink.core.fs.Path;
/**
* The base class for {@link InputFormat}s that read from files. For specific input types the
- * <tt>nextRecord()</tt> and <tt>reachedEnd()</tt> methods need to be implemented.
+ * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented.
* Additionally, one may override {@link #open(FileInputSplit)} and {@link #close()} to
* change the life cycle behavior.
- * <p>
- * After the {@link #open(FileInputSplit)} method completed, the file input data is available
- * from the {@link #stream} field.
+ *
+ * <p>After the {@link #open(FileInputSplit)} method completed, the file input data is available
+ * from the {@link #stream} field.</p>
*/
public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSplit> {
@@ -245,6 +245,8 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
// TODO The job-submission web interface passes empty args (and thus empty
// paths) to compute the preview graph. The following is a workaround for
// this situation and we should fix this.
+
+ // comment (Stephan Ewen) this should be no longer relevant with the current Java/Scalal APIs.
if (filePath.isEmpty()) {
setFilePath(new Path());
return;
@@ -609,7 +611,7 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
* The method may be overridden. Hadoop's FileInputFormat has a similar mechanism and applies the
* same filters by default.
*
- * @param fileStatus
+ * @param fileStatus The file status to check.
* @return true, if the given file or directory is accepted
*/
protected boolean acceptFile(FileStatus fileStatus) {
http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 9489f16..4af394f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -37,8 +38,7 @@ import java.io.OutputStreamWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -49,7 +49,7 @@ public class DelimitedInputFormatTest {
protected File tempFile;
- private final DelimitedInputFormat<Record> format = new MyTextInputFormat();
+ private final DelimitedInputFormat<String> format = new MyTextInputFormat();
// --------------------------------------------------------------------------------------------
@@ -90,7 +90,7 @@ public class DelimitedInputFormatTest {
final int LINE_LENGTH_LIMIT = 12345;
final int BUFFER_SIZE = 178;
- DelimitedInputFormat<Record> format = new MyTextInputFormat();
+ DelimitedInputFormat<String> format = new MyTextInputFormat();
format.setDelimiter(DELIMITER);
format.setNumLineSamples(NUM_LINE_SAMPLES);
format.setLineLengthLimit(LINE_LENGTH_LIMIT);
@@ -104,7 +104,7 @@ public class DelimitedInputFormatTest {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
@SuppressWarnings("unchecked")
- DelimitedInputFormat<Record> deserialized = (DelimitedInputFormat<Record>) ois.readObject();
+ DelimitedInputFormat<String> deserialized = (DelimitedInputFormat<String>) ois.readObject();
assertEquals(NUM_LINE_SAMPLES, deserialized.getNumLineSamples());
assertEquals(LINE_LENGTH_LIMIT, deserialized.getLineLengthLimit());
@@ -115,7 +115,7 @@ public class DelimitedInputFormatTest {
@Test
public void testOpen() throws IOException {
final String myString = "my mocked line 1\nmy mocked line 2\n";
- final FileInputSplit split = createTempFile(myString);
+ final FileInputSplit split = createTempFile(myString);
int bufferSize = 5;
format.setBufferSize(bufferSize);
@@ -125,35 +125,42 @@ public class DelimitedInputFormatTest {
assertEquals(bufferSize, format.getBufferSize());
}
+ /**
+ * Tests simple delimited parsing with a custom delimiter.
+ */
@Test
- public void testRead() throws IOException {
- final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
- final FileInputSplit split = createTempFile(myString);
-
- final Configuration parameters = new Configuration();
-
- format.setDelimiter("$$$");
- format.configure(parameters);
- format.open(split);
-
- Record theRecord = new Record();
+ public void testRead() {
+ try {
+ final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
+ final FileInputSplit split = createTempFile(myString);
+
+ final Configuration parameters = new Configuration();
+
+ format.setDelimiter("$$$");
+ format.configure(parameters);
+ format.open(split);
+
+ String first = format.nextRecord(null);
+ assertNotNull(first);
+ assertEquals("my key|my val", first);
- assertNotNull(format.nextRecord(theRecord));
- assertEquals("my key", theRecord.getField(0, StringValue.class).getValue());
- assertEquals("my val", theRecord.getField(1, StringValue.class).getValue());
-
- assertNotNull(format.nextRecord(theRecord));
- assertEquals("my key2\n$$ctd.$$", theRecord.getField(0, StringValue.class).getValue());
- assertEquals("my value2", theRecord.getField(1, StringValue.class).getValue());
-
- assertNull(format.nextRecord(theRecord));
- assertTrue(format.reachedEnd());
+ String second = format.nextRecord(null);
+ assertNotNull(second);
+ assertEquals("my key2\n$$ctd.$$|my value2", second);
+
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
public void testRead2() throws IOException {
// 2. test case
- final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
+ final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2\n";
final FileInputSplit split = createTempFile(myString);
final Configuration parameters = new Configuration();
@@ -162,20 +169,20 @@ public class DelimitedInputFormatTest {
format.configure(parameters);
format.open(split);
- Record theRecord = new Record();
-
- assertNotNull(format.nextRecord(theRecord));
- assertEquals("my key", theRecord.getField(0, StringValue.class).getValue());
- assertEquals("my val$$$my key2", theRecord.getField(1, StringValue.class).getValue());
+ String first = format.nextRecord(null);
+ String second = format.nextRecord(null);
- assertNotNull(format.nextRecord(theRecord));
- assertEquals("$$ctd.$$", theRecord.getField(0, StringValue.class).getValue());
- assertEquals("my value2", theRecord.getField(1, StringValue.class).getValue());
+ assertNotNull(first);
+ assertNotNull(second);
- assertNull(format.nextRecord(theRecord));
+ assertEquals("my key|my val$$$my key2", first);
+ assertEquals("$$ctd.$$|my value2", second);
+
+ assertNull(format.nextRecord(null));
assertTrue(format.reachedEnd());
}
+
private FileInputSplit createTempFile(String contents) throws IOException {
this.tempFile = File.createTempFile("test_contents", "tmp");
this.tempFile.deleteOnExit();
@@ -187,22 +194,13 @@ public class DelimitedInputFormatTest {
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
}
- protected static final class MyTextInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat<Record> {
+
+ protected static final class MyTextInputFormat extends DelimitedInputFormat<String> {
private static final long serialVersionUID = 1L;
- private final StringValue str1 = new StringValue();
- private final StringValue str2 = new StringValue();
-
@Override
- public Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes) {
- String theRecord = new String(bytes, offset, numBytes);
-
- str1.setValue(theRecord.substring(0, theRecord.indexOf('|')));
- str2.setValue(theRecord.substring(theRecord.indexOf('|') + 1));
-
- reuse.setField(0, str1);
- reuse.setField(1, str2);
- return reuse;
+ public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) {
+ return new String(bytes, offset, numBytes);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 7e3edc0..5aac540 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.types.IntValue;
+
import org.junit.Assert;
import org.junit.Test;
@@ -37,8 +38,17 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the FileInputFormat
+ */
public class FileInputFormatTest {
+ // ------------------------------------------------------------------------
+ // Statistics
+ // ------------------------------------------------------------------------
+
@Test
public void testGetStatisticsNonExistingFile() {
try {
@@ -189,6 +199,10 @@ public class FileInputFormatTest {
Assert.fail(ex.getMessage());
}
}
+
+ // ------------------------------------------------------------------------
+ // Unsplittable input files
+ // ------------------------------------------------------------------------
// ---- Tests for .deflate ---------
@@ -234,6 +248,10 @@ public class FileInputFormatTest {
Assert.fail(ex.getMessage());
}
}
+
+ // ------------------------------------------------------------------------
+ // Ignored Files
+ // ------------------------------------------------------------------------
@Test
public void testIgnoredUnderscoreFiles() {
@@ -243,11 +261,13 @@ public class FileInputFormatTest {
// create some accepted, some ignored files
File tempDir = new File(System.getProperty("java.io.tmpdir"));
- File f = null;
+ File f;
do {
f = new File(tempDir, TestFileUtils.randomFileName(""));
- } while (f.exists());
- f.mkdirs();
+ }
+ while (f.exists());
+
+ assertTrue(f.mkdirs());
f.deleteOnExit();
File child1 = new File(f, "dataFile1.txt");
@@ -301,11 +321,13 @@ public class FileInputFormatTest {
// create two accepted and two ignored files
File tempDir = new File(System.getProperty("java.io.tmpdir"));
- File f = null;
+ File f;
do {
f = new File(tempDir, TestFileUtils.randomFileName(""));
- } while (f.exists());
- f.mkdirs();
+ }
+ while (f.exists());
+
+ assertTrue(f.mkdirs());
f.deleteOnExit();
File child1 = new File(f, "dataFile1.txt");
@@ -342,7 +364,10 @@ public class FileInputFormatTest {
}
}
-
+ // ------------------------------------------------------------------------
+ // Stream Decoration
+ // ------------------------------------------------------------------------
+
@Test
public void testDecorateInputStream() throws IOException {
// create temporary file with 3 blocks