You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 15:15:25 UTC

[1/3] flink git commit: [core] Add tests for DelimitedInputFormat's handling of records across split boundaries

Repository: flink
Updated Branches:
  refs/heads/master 61b1c0a6c -> d94dfde57


[core] Add tests for DelimitedInputFormat's handling of records across split boundaries


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d94dfde5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d94dfde5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d94dfde5

Branch: refs/heads/master
Commit: d94dfde570632e6114dbca44c6464f204c198866
Parents: efa62df
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 12 21:34:32 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 13 15:14:41 2015 +0200

----------------------------------------------------------------------
 .../api/common/io/DelimitedInputFormat.java     |  18 +-
 .../api/common/io/DelimitedInputFormatTest.java | 270 +++++++++++++++----
 2 files changed, 239 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d94dfde5/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 7fc42ab..a1b045f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -80,7 +80,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	
 	static { loadGloablConfigParams(); }
 	
-	protected static final void loadGloablConfigParams() {
+	protected static void loadGloablConfigParams() {
 		int maxSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY,
 				ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_LINE_SAMPLES);
 		int minSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
@@ -570,9 +570,19 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 				return true;
 			}
 		}
+		
 		// else ..
-		int toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength;
-		if (this.splitLength <= 0) {
+		int toRead;
+		if (this.splitLength > 0) {
+			// if we have more data, read that
+			toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength;
+		}
+		else {
+			// if we have exhausted our split, we need to complete the current record, or read one
+			// more across the next split.
+			// the reason is that the next split will skip over the beginning until it finds the first
+			// delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
+			// previous split.
 			toRead = this.readBuffer.length;
 			this.overLimit = true;
 		}
@@ -592,7 +602,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	}
 	
 	// ============================================================================================
-	//  Parameterization via configuration
+	//  Parametrization via configuration
 	// ============================================================================================
 	
 	// ------------------------------------- Config Keys ------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d94dfde5/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 4af394f..599a640 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -34,6 +33,9 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -45,10 +47,6 @@ import org.junit.Test;
 
 public class DelimitedInputFormatTest {
 	
-	protected Configuration config;
-	
-	protected File tempFile;
-	
 	private final DelimitedInputFormat<String> format = new MyTextInputFormat();
 	
 	// --------------------------------------------------------------------------------------------
@@ -56,7 +54,6 @@ public class DelimitedInputFormatTest {
 	@Before
 	public void setup() {
 		this.format.setFilePath(new Path("file:///some/file/that/will/not/be/read"));
-		this.config = new Configuration();
 	}
 	
 	@After
@@ -64,22 +61,20 @@ public class DelimitedInputFormatTest {
 		if (this.format != null) {
 			this.format.close();
 		}
-		if (this.tempFile != null) {
-			this.tempFile.delete();
-		}
 	}
 
 	// --------------------------------------------------------------------------------------------
 	// --------------------------------------------------------------------------------------------
 	@Test
 	public void testConfigure() {
-		this.config.setString("delimited-format.delimiter", "\n");
+		Configuration cfg = new Configuration();
+		cfg.setString("delimited-format.delimiter", "\n");
 		
-		format.configure(this.config);
+		format.configure(cfg);
 		assertEquals("\n", new String(format.getDelimiter()));
 
-		this.config.setString("delimited-format.delimiter", "&-&");
-		format.configure(this.config);
+		cfg.setString("delimited-format.delimiter", "&-&");
+		format.configure(cfg);
 		assertEquals("&-&", new String(format.getDelimiter()));
 	}
 	
@@ -125,11 +120,58 @@ public class DelimitedInputFormatTest {
 		assertEquals(bufferSize, format.getBufferSize());
 	}
 
-	/**
-	 * Tests simple delimited parsing with a custom delimiter.
-	 */
 	@Test
-	public void testRead() {
+	public void testReadWithoutTrailingDelimiter() throws IOException {
+		// 2. test case
+		final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
+		final FileInputSplit split = createTempFile(myString);
+
+		final Configuration parameters = new Configuration();
+		// default delimiter = '\n'
+
+		format.configure(parameters);
+		format.open(split);
+
+		String first = format.nextRecord(null);
+		String second = format.nextRecord(null);
+
+		assertNotNull(first);
+		assertNotNull(second);
+
+		assertEquals("my key|my val$$$my key2", first);
+		assertEquals("$$ctd.$$|my value2", second);
+
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+	}
+	
+	@Test
+	public void testReadWithTrailingDelimiter() throws IOException {
+		// 2. test case
+		final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2\n";
+		final FileInputSplit split = createTempFile(myString);
+
+		final Configuration parameters = new Configuration();
+		// default delimiter = '\n'
+
+		format.configure(parameters);
+		format.open(split);
+
+		String first = format.nextRecord(null);
+		String second = format.nextRecord(null);
+
+		assertNotNull(first);
+		assertNotNull(second);
+
+		assertEquals("my key|my val$$$my key2", first);
+		assertEquals("$$ctd.$$|my value2", second);
+
+		assertNull(format.nextRecord(null));
+		assertTrue(format.reachedEnd());
+	}
+	
+	@Test
+	public void testReadCustomDelimiter() {
 		try {
 			final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
 			final FileInputSplit split = createTempFile(myString);
@@ -156,42 +198,180 @@ public class DelimitedInputFormatTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	/**
+	 * Tests that the records are read correctly when the split boundary is in the middle of a record.
+	 */
 	@Test
-	public void testRead2() throws IOException {
-		// 2. test case
-		final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2\n";
-		final FileInputSplit split = createTempFile(myString);
-		
-		final Configuration parameters = new Configuration();
-		// default delimiter = '\n'
-		
-		format.configure(parameters);
-		format.open(split);
+	public void testReadOverSplitBoundariesUnaligned() {
+		try {
+			final String myString = "value1\nvalue2\nvalue3";
+			final FileInputSplit split = createTempFile(myString);
+			
+			FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+			FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
 
-		String first = format.nextRecord(null);
-		String second = format.nextRecord(null);
-		
-		assertNotNull(first);
-		assertNotNull(second);
-		
-		assertEquals("my key|my val$$$my key2", first);
-		assertEquals("$$ctd.$$|my value2", second);
-		
-		assertNull(format.nextRecord(null));
-		assertTrue(format.reachedEnd());
+			final Configuration parameters = new Configuration();
+			
+			format.configure(parameters);
+			format.open(split1);
+			
+			assertEquals("value1", format.nextRecord(null));
+			assertEquals("value2", format.nextRecord(null));
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+			
+			format.close();
+			format.open(split2);
+
+			assertEquals("value3", format.nextRecord(null));
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+			
+			format.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests that the correct number of records is read when the split boundary is exact at the record boundary.
+	 */
+	@Test
+	public void testReadWithBufferSizeIsMultple() {
+		try {
+			final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
+			final FileInputSplit split = createTempFile(myString);
+
+			FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+			FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+
+			final Configuration parameters = new Configuration();
+
+			format.setBufferSize(2 * ((int) split1.getLength()));
+			format.configure(parameters);
+
+			String next;
+			int count = 0;
+
+			// read split 1
+			format.open(split1);
+			while ((next = format.nextRecord(null)) != null) {
+				assertEquals(7, next.length());
+				count++;
+			}
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+			format.close();
+			
+			// this one must have read one too many, because the next split will skipp the trailing remainder
+			// which happens to be one full record
+			assertEquals(3, count);
+
+			// read split 2
+			format.open(split2);
+			while ((next = format.nextRecord(null)) != null) {
+				assertEquals(7, next.length());
+				count++;
+			}
+			format.close();
+
+			assertEquals(4, count);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testReadExactlyBufferSize() {
+		try {
+			final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
+			
+			final FileInputSplit split = createTempFile(myString);
+			final Configuration parameters = new Configuration();
+			
+			format.setBufferSize((int) split.getLength());
+			format.configure(parameters);
+			format.open(split);
+
+			String next;
+			int count = 0;
+			while ((next = format.nextRecord(null)) != null) {
+				assertEquals(7, next.length());
+				count++;
+			}
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+
+			format.close();
+			
+			assertEquals(4, count);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testReadRecordsLargerThanBuffer() {
+		try {
+			final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" +
+									"bbbbbbbbbbbbbbbbbbbbbbbbb\n" +
+									"ccccccccccccccccccc\n" +
+									"ddddddddddddddddddddddddddddddddddd\n";
+
+			final FileInputSplit split = createTempFile(myString);
+			FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+			FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+			
+			final Configuration parameters = new Configuration();
+
+			format.setBufferSize(8);
+			format.configure(parameters);
+
+			String next;
+			List<String> result = new ArrayList<String>();
+			
+			
+			format.open(split1);
+			while ((next = format.nextRecord(null)) != null) {
+				result.add(next);
+			}
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+			format.close();
+
+			format.open(split2);
+			while ((next = format.nextRecord(null)) != null) {
+				result.add(next);
+			}
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+			format.close();
+			
+			assertEquals(4, result.size());
+			assertEquals(Arrays.asList(myString.split("\n")), result);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 	
-	
-	private FileInputSplit createTempFile(String contents) throws IOException {
-		this.tempFile = File.createTempFile("test_contents", "tmp");
-		this.tempFile.deleteOnExit();
+	private static FileInputSplit createTempFile(String contents) throws IOException {
+		File tempFile = File.createTempFile("test_contents", "tmp");
+		tempFile.deleteOnExit();
 		
-		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(this.tempFile));
+		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
 		wrt.write(contents);
 		wrt.close();
 		
-		return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
+		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
 	}
 	
 	


[3/3] flink git commit: [core] Removed redundant common.io.FormatUtil

Posted by se...@apache.org.
[core] Removed redundant common.io.FormatUtil


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d5d63f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d5d63f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d5d63f7

Branch: refs/heads/master
Commit: 3d5d63f7f80f3dfb353a86b10cfbed653b24421f
Parents: 61b1c0a
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 12 19:42:49 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 13 15:14:41 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/api/common/io/FormatUtil.java  | 186 -------------------
 .../api/common/io/SerializedFormatTest.java     |  21 ++-
 2 files changed, 15 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d5d63f7/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
deleted file mode 100644
index 83860a5..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.io;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.util.ReflectionUtil;
-
-/**
- * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}.
- */
-public class FormatUtil {
-
-
-	/**
-	 * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration}
-	 * initializes the format.
-	 * 
-	 * @param <T>
-	 *        the class of the InputFormat
-	 * @param inputFormatClass
-	 *        the class of the InputFormat
-	 * @param path
-	 *        the path of the file
-	 * @param configuration
-	 *        optional configuration of the InputFormat
-	 * @return the created {@link InputFormat}
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the file or initializing the InputFormat.
-	 */
-	public static <T, F extends FileInputFormat<T>> F openInput(
-			Class<F> inputFormatClass, String path, Configuration configuration)
-		throws IOException
-	{
-		configuration = configuration == null ? new Configuration() : configuration;
-
-		Path normalizedPath = normalizePath(new Path(path));
-		final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
-
-		inputFormat.setFilePath(normalizedPath);
-		inputFormat.setOpenTimeout(0);
-		inputFormat.configure(configuration);
-
-		final FileSystem fs = FileSystem.get(normalizedPath.toUri());
-		FileStatus fileStatus = fs.getFileStatus(normalizedPath);
-
-		BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-		inputFormat.open(new FileInputSplit(0, new Path(path), 0, fileStatus.getLen(), blocks[0].getHosts()));
-		return inputFormat;
-	}
-
-	/**
-	 * Creates {@link InputFormat}s from a given class for the specified file(s). The optional {@link Configuration}
-	 * initializes the formats.
-	 * 
-	 * @param <T>
-	 *        the class of the InputFormat
-	 * @param inputFormatClass
-	 *        the class of the InputFormat
-	 * @param path
-	 *        the path of the file or to the directory containing the splits
-	 * @param configuration
-	 *        optional configuration of the InputFormat
-	 * @return the created {@link InputFormat}s for each file in the specified path
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the files or initializing the InputFormat.
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T, F extends FileInputFormat<T>> List<F> openAllInputs(
-			Class<F> inputFormatClass, String path, Configuration configuration) throws IOException {
-		Path nephelePath = new Path(path);
-		FileSystem fs = nephelePath.getFileSystem();
-		FileStatus fileStatus = fs.getFileStatus(nephelePath);
-		if (!fileStatus.isDir()) {
-			return Arrays.asList(openInput(inputFormatClass, path, configuration));
-		}
-		FileStatus[] list = fs.listStatus(nephelePath);
-		List<F> formats = new ArrayList<F>();
-		for (int index = 0; index < list.length; index++) {
-			formats.add(openInput(inputFormatClass, list[index].getPath().toString(), configuration));
-		}
-		return formats;
-	}
-
-	/**
-	 * Creates an {@link InputFormat} from a given class. The optional {@link Configuration}
-	 * initializes the format.
-	 * 
-	 * @param <T>
-	 *        the class of the InputFormat
-	 * @param inputFormatClass
-	 *        the class of the InputFormat
-	 * @param configuration
-	 *        optional configuration of the InputFormat
-	 * @return the created {@link InputFormat}
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the file or initializing the InputFormat.
-	 */
-	public static <T, IS extends InputSplit, F extends InputFormat<T, IS>> F openInput(
-			Class<F> inputFormatClass, Configuration configuration) throws IOException {
-		configuration = configuration == null ? new Configuration() : configuration;
-
-		final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
-		inputFormat.configure(configuration);
-		final IS[] splits = inputFormat.createInputSplits(1);
-		inputFormat.open(splits[0]);
-		return inputFormat;
-	}
-	
-	/**
-	 * Creates an {@link OutputFormat} from a given class for the specified file. The optional {@link Configuration}
-	 * initializes the format.
-	 * 
-	 * @param <T>
-	 *        the class of the OutputFormat
-	 * @param outputFormatClass
-	 *        the class of the OutputFormat
-	 * @param path
-	 *        the path of the file or to the directory containing the splits
-	 * @param configuration
-	 *        optional configuration of the OutputFormat
-	 * @return the created {@link OutputFormat}
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the file or initializing the OutputFormat.
-	 */
-	public static <T, F extends FileOutputFormat<? extends T>> F openOutput(
-			Class<F> outputFormatClass, String path, Configuration configuration) 
-		throws IOException
-	{
-		final F outputFormat = ReflectionUtil.newInstance(outputFormatClass);
-		outputFormat.setOutputFilePath(new Path(path));
-		outputFormat.setWriteMode(WriteMode.OVERWRITE);
-	
-		configuration = configuration == null ? new Configuration() : configuration;
-		
-		outputFormat.configure(configuration);
-		outputFormat.open(0, 1);
-		return outputFormat;
-	}
-
-	/**
-	 * Fixes the path if it denotes a local (relative) file without the proper protocol prefix.
-	 */
-	private static Path normalizePath(Path path) {
-		URI uri = path.toUri();
-		if (uri.getScheme() == null) {
-			try {
-				uri = new URI("file", uri.getHost(), uri.getPath(), uri.getFragment());
-				path = new Path(uri.toString());
-			} catch (URISyntaxException e) {
-				throw new IllegalArgumentException("path is invalid", e);
-			}
-		}
-		return path;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d5d63f7/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
index e421f4f..d82623b 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
@@ -19,10 +19,13 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.runner.RunWith;
@@ -56,13 +59,19 @@ public class SerializedFormatTest extends SequentialFormatTestBase<Record> {
 		return inputFormat;
 	}
 
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
+	
 	@Override
-	protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration
-			configuration) throws IOException {
-		return FormatUtil.<Record, SerializedOutputFormat>openOutput
-				(SerializedOutputFormat.class, path, configuration);
+	protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration configuration)
+			throws IOException
+	{
+		final SerializedOutputFormat<Record> outputFormat = new SerializedOutputFormat<Record>();
+		outputFormat.setOutputFilePath(new Path(path));
+		outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+
+		configuration = configuration == null ? new Configuration() : configuration;
+		outputFormat.configure(configuration);
+		outputFormat.open(0, 1);
+		return outputFormat;
 	}
 
 	@Override


[2/3] flink git commit: [core] Minor cleanups in FileInputFormat and DelimitedInputFormat

Posted by se...@apache.org.
[core] Minor cleanups in FileInputFormat and DelimitedInputFormat


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efa62df8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efa62df8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efa62df8

Branch: refs/heads/master
Commit: efa62df879c608daa1750797e2a34ffa928592f9
Parents: 3d5d63f
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 12 20:26:20 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 13 15:14:41 2015 +0200

----------------------------------------------------------------------
 .../api/common/io/DelimitedInputFormat.java     |  9 +-
 .../flink/api/common/io/FileInputFormat.java    | 12 ++-
 .../api/common/io/DelimitedInputFormatTest.java | 98 ++++++++++----------
 .../api/common/io/FileInputFormatTest.java      | 39 ++++++--
 4 files changed, 91 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index d2b4e83..7fc42ab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
 import java.io.IOException;
@@ -40,8 +39,8 @@ import com.google.common.base.Charsets;
  * Base implementation for input formats that split the input at a delimiter into records.
  * The parsing of the record bytes into the record has to be implemented in the
  * {@link #readRecord(Object, byte[], int, int)} method.
- * <p>
- * The default delimiter is the newline character {@code '\n'}.
+ * 
+ * <p>The default delimiter is the newline character {@code '\n'}.</p>
  */
 public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	
@@ -495,7 +494,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 			}
 
 			int startPos = this.readPos;
-			int count = 0;
+			int count;
 
 			while (this.readPos < this.limit && i < this.delimiter.length) {
 				if ((this.readBuffer[this.readPos++]) == this.delimiter[i]) {
@@ -559,7 +558,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 
 	private boolean fillBuffer() throws IOException {
 		// special case for reading the whole split.
-		if(this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
+		if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
 			int read = this.stream.read(this.readBuffer, 0, readBuffer.length);
 			if (read == -1) {
 				this.stream.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 0584b96..c9c9ec1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -47,12 +47,12 @@ import org.apache.flink.core.fs.Path;
 
 /**
  * The base class for {@link InputFormat}s that read from files. For specific input types the 
- * <tt>nextRecord()</tt> and <tt>reachedEnd()</tt> methods need to be implemented.
+ * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented.
  * Additionally, one may override {@link #open(FileInputSplit)} and {@link #close()} to
  * change the life cycle behavior.
- * <p>
- * After the {@link #open(FileInputSplit)} method completed, the file input data is available
- * from the {@link #stream} field.
+ * 
+ * <p>After the {@link #open(FileInputSplit)} method completed, the file input data is available
+ * from the {@link #stream} field.</p>
  */
 public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSplit> {
 	
@@ -245,6 +245,8 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
 		// TODO The job-submission web interface passes empty args (and thus empty
 		// paths) to compute the preview graph. The following is a workaround for
 		// this situation and we should fix this.
+		
+		// comment (Stephan Ewen) this should be no longer relevant with the current Java/Scalal APIs.
 		if (filePath.isEmpty()) {
 			setFilePath(new Path());
 			return;
@@ -609,7 +611,7 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
 	 * The method may be overridden. Hadoop's FileInputFormat has a similar mechanism and applies the
 	 * same filters by default.
 	 * 
-	 * @param fileStatus
+	 * @param fileStatus The file status to check.
 	 * @return true, if the given file or directory is accepted
 	 */
 	protected boolean acceptFile(FileStatus fileStatus) {

http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 9489f16..4af394f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -37,8 +38,7 @@ import java.io.OutputStreamWriter;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public class DelimitedInputFormatTest {
 	
 	protected File tempFile;
 	
-	private final DelimitedInputFormat<Record> format = new MyTextInputFormat();
+	private final DelimitedInputFormat<String> format = new MyTextInputFormat();
 	
 	// --------------------------------------------------------------------------------------------
 
@@ -90,7 +90,7 @@ public class DelimitedInputFormatTest {
 		final int LINE_LENGTH_LIMIT = 12345;
 		final int BUFFER_SIZE = 178;
 		
-		DelimitedInputFormat<Record> format = new MyTextInputFormat();
+		DelimitedInputFormat<String> format = new MyTextInputFormat();
 		format.setDelimiter(DELIMITER);
 		format.setNumLineSamples(NUM_LINE_SAMPLES);
 		format.setLineLengthLimit(LINE_LENGTH_LIMIT);
@@ -104,7 +104,7 @@ public class DelimitedInputFormatTest {
 		
 		ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
 		@SuppressWarnings("unchecked")
-		DelimitedInputFormat<Record> deserialized = (DelimitedInputFormat<Record>) ois.readObject();
+		DelimitedInputFormat<String> deserialized = (DelimitedInputFormat<String>) ois.readObject();
 		
 		assertEquals(NUM_LINE_SAMPLES, deserialized.getNumLineSamples());
 		assertEquals(LINE_LENGTH_LIMIT, deserialized.getLineLengthLimit());
@@ -115,7 +115,7 @@ public class DelimitedInputFormatTest {
 	@Test
 	public void testOpen() throws IOException {
 		final String myString = "my mocked line 1\nmy mocked line 2\n";
-		final FileInputSplit split = createTempFile(myString);	
+		final FileInputSplit split = createTempFile(myString);
 		
 		int bufferSize = 5;
 		format.setBufferSize(bufferSize);
@@ -125,35 +125,42 @@ public class DelimitedInputFormatTest {
 		assertEquals(bufferSize, format.getBufferSize());
 	}
 
+	/**
+	 * Tests simple delimited parsing with a custom delimiter.
+	 */
 	@Test
-	public void testRead() throws IOException {
-		final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
-		final FileInputSplit split = createTempFile(myString);
-		
-		final Configuration parameters = new Configuration();
-		
-		format.setDelimiter("$$$");
-		format.configure(parameters);
-		format.open(split);
-		
-		Record theRecord = new Record();
+	public void testRead() {
+		try {
+			final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
+			final FileInputSplit split = createTempFile(myString);
+			
+			final Configuration parameters = new Configuration();
+			
+			format.setDelimiter("$$$");
+			format.configure(parameters);
+			format.open(split);
+	
+			String first = format.nextRecord(null);
+			assertNotNull(first);
+			assertEquals("my key|my val", first);
 
-		assertNotNull(format.nextRecord(theRecord));
-		assertEquals("my key", theRecord.getField(0, StringValue.class).getValue());
-		assertEquals("my val", theRecord.getField(1, StringValue.class).getValue());
-		
-		assertNotNull(format.nextRecord(theRecord));
-		assertEquals("my key2\n$$ctd.$$", theRecord.getField(0, StringValue.class).getValue());
-		assertEquals("my value2", theRecord.getField(1, StringValue.class).getValue());
-		
-		assertNull(format.nextRecord(theRecord));
-		assertTrue(format.reachedEnd());
+			String second = format.nextRecord(null);
+			assertNotNull(second);
+			assertEquals("my key2\n$$ctd.$$|my value2", second);
+			
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 	
 	@Test
 	public void testRead2() throws IOException {
 		// 2. test case
-		final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2";
+		final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2\n";
 		final FileInputSplit split = createTempFile(myString);
 		
 		final Configuration parameters = new Configuration();
@@ -162,20 +169,20 @@ public class DelimitedInputFormatTest {
 		format.configure(parameters);
 		format.open(split);
 
-		Record theRecord = new Record();
-
-		assertNotNull(format.nextRecord(theRecord));
-		assertEquals("my key", theRecord.getField(0, StringValue.class).getValue());
-		assertEquals("my val$$$my key2", theRecord.getField(1, StringValue.class).getValue());
+		String first = format.nextRecord(null);
+		String second = format.nextRecord(null);
 		
-		assertNotNull(format.nextRecord(theRecord));
-		assertEquals("$$ctd.$$", theRecord.getField(0, StringValue.class).getValue());
-		assertEquals("my value2", theRecord.getField(1, StringValue.class).getValue());
+		assertNotNull(first);
+		assertNotNull(second);
 		
-		assertNull(format.nextRecord(theRecord));
+		assertEquals("my key|my val$$$my key2", first);
+		assertEquals("$$ctd.$$|my value2", second);
+		
+		assertNull(format.nextRecord(null));
 		assertTrue(format.reachedEnd());
 	}
 	
+	
 	private FileInputSplit createTempFile(String contents) throws IOException {
 		this.tempFile = File.createTempFile("test_contents", "tmp");
 		this.tempFile.deleteOnExit();
@@ -187,22 +194,13 @@ public class DelimitedInputFormatTest {
 		return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
 	}
 	
-	protected static final class MyTextInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat<Record> {
+	
+	protected static final class MyTextInputFormat extends DelimitedInputFormat<String> {
 		private static final long serialVersionUID = 1L;
 		
-		private final StringValue str1 = new StringValue();
-		private final StringValue str2 = new StringValue();
-		
 		@Override
-		public Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes) {
-			String theRecord = new String(bytes, offset, numBytes);
-			
-			str1.setValue(theRecord.substring(0, theRecord.indexOf('|')));
-			str2.setValue(theRecord.substring(theRecord.indexOf('|') + 1));
-			
-			reuse.setField(0, str1);
-			reuse.setField(1, str2);
-			return reuse;
+		public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) {
+			return new String(bytes, offset, numBytes);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 7e3edc0..5aac540 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.types.IntValue;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -37,8 +38,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the FileInputFormat
+ */
 public class FileInputFormatTest {
 
+	// ------------------------------------------------------------------------
+	//  Statistics
+	// ------------------------------------------------------------------------
+	
 	@Test
 	public void testGetStatisticsNonExistingFile() {
 		try {
@@ -189,6 +199,10 @@ public class FileInputFormatTest {
 			Assert.fail(ex.getMessage());
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  Unsplittable input files
+	// ------------------------------------------------------------------------
 	
 	// ---- Tests for .deflate ---------
 	
@@ -234,6 +248,10 @@ public class FileInputFormatTest {
 			Assert.fail(ex.getMessage());
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  Ignored Files
+	// ------------------------------------------------------------------------
 	
 	@Test
 	public void testIgnoredUnderscoreFiles() {
@@ -243,11 +261,13 @@ public class FileInputFormatTest {
 			// create some accepted, some ignored files
 			
 			File tempDir = new File(System.getProperty("java.io.tmpdir"));
-			File f = null;
+			File f;
 			do {
 				f = new File(tempDir, TestFileUtils.randomFileName(""));
-			} while (f.exists());
-			f.mkdirs();
+			}
+			while (f.exists());
+
+			assertTrue(f.mkdirs());
 			f.deleteOnExit();
 			
 			File child1 = new File(f, "dataFile1.txt");
@@ -301,11 +321,13 @@ public class FileInputFormatTest {
 
 			// create two accepted and two ignored files
 			File tempDir = new File(System.getProperty("java.io.tmpdir"));
-			File f = null;
+			File f;
 			do {
 				f = new File(tempDir, TestFileUtils.randomFileName(""));
-			} while (f.exists());
-			f.mkdirs();
+			}
+			while (f.exists());
+			
+			assertTrue(f.mkdirs());
 			f.deleteOnExit();
 
 			File child1 = new File(f, "dataFile1.txt");
@@ -342,7 +364,10 @@ public class FileInputFormatTest {
 		}
 	}
 
-
+	// ------------------------------------------------------------------------
+	//  Stream Decoration
+	// ------------------------------------------------------------------------
+	
 	@Test
 	public void testDecorateInputStream() throws IOException {
 		// create temporary file with 3 blocks