You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/26 01:20:40 UTC

[2/8] flink git commit: [FLINK-2906] Remove Record API

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
deleted file mode 100644
index 89baa98..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings({ "serial", "deprecation" })
-public class ReduceWrappingFunctionTest {
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testWrappedReduceObject() {
-		try {
-			AtomicInteger methodCounter = new AtomicInteger();
-			
-			ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction(methodCounter)).build();
-			
-			RichFunction reducer = (RichFunction) reduceOp.getUserCodeWrapper().getUserCodeObject();
-			
-			// test the method invocations
-			reducer.close();
-			reducer.open(new Configuration());
-			assertEquals(2, methodCounter.get());
-			
-			// prepare the reduce / combine tests
-			final List<Record> target = new ArrayList<Record>();
-			Collector<Record> collector = new Collector<Record>() {
-				@Override
-				public void collect(Record record) {
-					target.add(record);
-				}
-				@Override
-				public void close() {}
-			};
-			
-			List<Record> source = new ArrayList<Record>();
-			source.add(new Record(new IntValue(42), new LongValue(11)));
-			source.add(new Record(new IntValue(13), new LongValue(17)));
-			
-			// test reduce
-			((GroupReduceFunction<Record, Record>) reducer).reduce(source, collector);
-			assertEquals(2, target.size());
-			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
-			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
-			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
-			assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
-			target.clear();
-			
-			// test combine
-			((GroupCombineFunction<Record, Record>) reducer).combine(source, collector);
-			assertEquals(2, target.size());
-			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
-			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
-			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
-			assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
-			target.clear();
-			
-			// test the serialization
-			SerializationUtils.clone((java.io.Serializable) reducer);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testWrappedReduceClass() {
-		try {
-			ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build();
-			
-			UserCodeWrapper<GroupReduceFunction<Record, Record>> udf = reduceOp.getUserCodeWrapper();
-			UserCodeWrapper<GroupReduceFunction<Record, Record>> copy = SerializationUtils.clone(udf);
-			GroupReduceFunction<Record, Record> reducer = copy.getUserCodeObject();
-			
-			// prepare the reduce / combine tests
-			final List<Record> target = new ArrayList<Record>();
-			Collector<Record> collector = new Collector<Record>() {
-				@Override
-				public void collect(Record record) {
-					target.add(record);
-				}
-				@Override
-				public void close() {}
-			};
-			
-			List<Record> source = new ArrayList<Record>();
-			source.add(new Record(new IntValue(42), new LongValue(11)));
-			source.add(new Record(new IntValue(13), new LongValue(17)));
-			
-			// test reduce
-			reducer.reduce(source, collector);
-			assertEquals(2, target.size());
-			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
-			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
-			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
-			assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
-			target.clear();
-			
-			// test combine
-			((GroupCombineFunction<Record, Record>) reducer).combine(source, collector);
-			assertEquals(2, target.size());
-			assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
-			assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
-			assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
-			assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
-			target.clear();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testExtractSemantics() {
-		try {
-			{
-				ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build();
-				
-				SingleInputSemanticProperties props = reduceOp.getSemanticProperties();
-				FieldSet fw2 = props.getForwardingTargetFields(0, 2);
-				FieldSet fw4 = props.getForwardingTargetFields(0, 4);
-				assertNotNull(fw2);
-				assertNotNull(fw4);
-				assertEquals(1, fw2.size());
-				assertEquals(1, fw4.size());
-				assertTrue(fw2.contains(2));
-				assertTrue(fw4.contains(4));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testCombinable() {
-		try {
-			{
-				ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build();
-				assertTrue(reduceOp.isCombinable());
-			}
-			{
-				ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build();
-				assertTrue(reduceOp.isCombinable());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Combinable
-	@ConstantFields({2, 4})
-	public static class TestReduceFunction extends ReduceFunction {
-		
-		private final AtomicInteger methodCounter;
-		
-		private TestReduceFunction(AtomicInteger methodCounter) {
-			this.methodCounter= methodCounter;
-		}
-		
-		public TestReduceFunction() {
-			methodCounter = new AtomicInteger();
-		}
-		
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-			while (records.hasNext()) {
-				out.collect(records.next());
-			}
-		}
-		
-		@Override
-		public void close() throws Exception {
-			methodCounter.incrementAndGet();
-			super.close();
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			methodCounter.incrementAndGet();
-			super.open(parameters);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
deleted file mode 100644
index 5ccfdd9..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.junit.Assert;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvInputFormatTest {
-	
-	protected File tempFile;
-	
-	private final CsvInputFormat format = new CsvInputFormat();
-	
-	//Static variables for testing the removal of \r\n to \n
-	private static final String FIRST_PART = "That is the first part";
-		
-	private static final String SECOND_PART = "That is the second part";
-	
-	// --------------------------------------------------------------------------------------------
-	@Before
-	public void setup() {
-		format.setFilePath("file:///some/file/that/will/not/be/read");
-	}
-	
-	@After
-	public void setdown() throws Exception {
-		if (this.format != null) {
-			this.format.close();
-		}
-		if (this.tempFile != null) {
-			this.tempFile.delete();
-		}
-	}
-
-	@Test
-	public void testConfigureEmptyConfig() {
-		try {
-			Configuration config = new Configuration();
-			
-			// empty configuration, plus no fields on the format itself is not valid
-			try {
-				format.configure(config);
-				fail(); // should give an error
-			} catch (IllegalConfigurationException e) {
-				; // okay
-			}
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Test
-	public void readWithEmptyFieldInstanceParameters() {
-		try {
-			final String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
-			final FileInputSplit split = createTempFile(fileContent);	
-		
-			final Configuration parameters = new Configuration();
-
-			format.setFieldDelimiter('|');
-			format.setFieldTypes(StringValue.class, StringValue.class, StringValue.class);
-			
-			format.configure(parameters);
-			format.open(split);
-			
-			Record record = new Record();
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals("abc", record.getField(0, StringValue.class).getValue());
-			assertEquals("def", record.getField(1, StringValue.class).getValue());
-			assertEquals("ghijk", record.getField(2, StringValue.class).getValue());
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals("abc", record.getField(0, StringValue.class).getValue());
-			assertEquals("", record.getField(1, StringValue.class).getValue());
-			assertEquals("hhg", record.getField(2, StringValue.class).getValue());
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals("", record.getField(0, StringValue.class).getValue());
-			assertEquals("", record.getField(1, StringValue.class).getValue());
-			assertEquals("", record.getField(2, StringValue.class).getValue());
-		}
-		catch (Exception ex) {
-			ex.printStackTrace();
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@Test
-	public void readWithEmptyFieldConfigParameters() {
-		try {
-			final String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
-			final FileInputSplit split = createTempFile(fileContent);	
-		
-			final Configuration parameters = new Configuration();
-			new CsvInputFormat.ConfigBuilder(null, parameters)
-				.field(StringValue.class, 0).field(StringValue.class, 1).field(StringValue.class, 2);
-			
-			format.setFieldDelimiter("|");
-			
-			format.configure(parameters);
-			format.open(split);
-			
-			Record record = new Record();
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals("abc", record.getField(0, StringValue.class).getValue());
-			assertEquals("def", record.getField(1, StringValue.class).getValue());
-			assertEquals("ghijk", record.getField(2, StringValue.class).getValue());
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals("abc", record.getField(0, StringValue.class).getValue());
-			assertEquals("", record.getField(1, StringValue.class).getValue());
-			assertEquals("hhg", record.getField(2, StringValue.class).getValue());
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals("", record.getField(0, StringValue.class).getValue());
-			assertEquals("", record.getField(1, StringValue.class).getValue());
-			assertEquals("", record.getField(2, StringValue.class).getValue());
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@Test
-	public void testReadAll() throws IOException {
-		try {
-			final String fileContent = "111|222|333|444|555\n666|777|888|999|000|";
-			final FileInputSplit split = createTempFile(fileContent);	
-		
-			final Configuration parameters = new Configuration();
-			
-			new CsvInputFormat.ConfigBuilder(null, parameters)
-				.fieldDelimiter('|')
-				.field(IntValue.class, 0).field(IntValue.class, 1).field(IntValue.class, 2)
-				.field(IntValue.class, 3).field(IntValue.class, 4);
-			
-			format.configure(parameters);
-			format.open(split);
-			
-			Record record = new Record();
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(111, record.getField(0, IntValue.class).getValue());
-			assertEquals(222, record.getField(1, IntValue.class).getValue());
-			assertEquals(333, record.getField(2, IntValue.class).getValue());
-			assertEquals(444, record.getField(3, IntValue.class).getValue());
-			assertEquals(555, record.getField(4, IntValue.class).getValue());
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(666, record.getField(0, IntValue.class).getValue());
-			assertEquals(777, record.getField(1, IntValue.class).getValue());
-			assertEquals(888, record.getField(2, IntValue.class).getValue());
-			assertEquals(999, record.getField(3, IntValue.class).getValue());
-			assertEquals(000, record.getField(4, IntValue.class).getValue());
-			
-			assertNull(format.nextRecord(record));
-			assertTrue(format.reachedEnd());
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@Test
-	public void testReadFirstN() throws IOException {
-		try {
-			final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|";
-			final FileInputSplit split = createTempFile(fileContent);	
-		
-			final Configuration parameters = new Configuration();
-			
-			new CsvInputFormat.ConfigBuilder(null, parameters)
-			.fieldDelimiter('|')
-			.field(IntValue.class, 0).field(IntValue.class, 1);
-			
-			format.configure(parameters);
-			format.open(split);
-			
-			Record record = new Record();
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(111, record.getField(0, IntValue.class).getValue());
-			assertEquals(222, record.getField(1, IntValue.class).getValue());
-			boolean notParsed = false;
-			try {
-				record.getField(2, IntValue.class);
-			} catch (IndexOutOfBoundsException ioo) {
-				notParsed = true;
-			}
-			assertTrue(notParsed);
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(666, record.getField(0, IntValue.class).getValue());
-			assertEquals(777, record.getField(1, IntValue.class).getValue());
-			notParsed = false;
-			try {
-				record.getField(2, IntValue.class);
-			} catch (IndexOutOfBoundsException ioo) {
-				notParsed = true;
-			}
-			assertTrue(notParsed);
-			
-			assertNull(format.nextRecord(record));
-			assertTrue(format.reachedEnd());
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-		
-	}
-	
-	@Test
-	public void testReadSparse() throws IOException {
-		try {
-			final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
-			final FileInputSplit split = createTempFile(fileContent);	
-		
-			final Configuration parameters = new Configuration();
-			
-			new CsvInputFormat.ConfigBuilder(null, parameters)
-				.fieldDelimiter('|')
-				.field(IntValue.class, 0).field(IntValue.class, 3).field(IntValue.class, 7);
-			
-			format.configure(parameters);
-			format.open(split);
-			
-			Record record = new Record();
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(111, record.getField(0, IntValue.class).getValue());
-			assertEquals(444, record.getField(1, IntValue.class).getValue());
-			assertEquals(888, record.getField(2, IntValue.class).getValue());
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(000, record.getField(0, IntValue.class).getValue());
-			assertEquals(777, record.getField(1, IntValue.class).getValue());
-			assertEquals(333, record.getField(2, IntValue.class).getValue());
-			
-			assertNull(format.nextRecord(record));
-			assertTrue(format.reachedEnd());
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@Test
-	public void testReadSparseShufflePosition() throws IOException {
-		try {
-			final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
-			final FileInputSplit split = createTempFile(fileContent);	
-		
-			final Configuration parameters = new Configuration();
-			
-			new CsvInputFormat.ConfigBuilder(null, parameters)
-				.fieldDelimiter('|')
-				.field(IntValue.class, 8).field(IntValue.class, 1).field(IntValue.class, 3);
-			
-			format.configure(parameters);
-			format.open(split);
-			
-			Record record = new Record();
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(999, record.getField(0, IntValue.class).getValue());
-			assertEquals(222, record.getField(1, IntValue.class).getValue());
-			assertEquals(444, record.getField(2, IntValue.class).getValue());
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(222, record.getField(0, IntValue.class).getValue());
-			assertEquals(999, record.getField(1, IntValue.class).getValue());
-			assertEquals(777, record.getField(2, IntValue.class).getValue());
-			
-			assertNull(format.nextRecord(record));
-			assertTrue(format.reachedEnd());
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	private FileInputSplit createTempFile(String content) throws IOException {
-		this.tempFile = File.createTempFile("test_contents", "tmp");
-		this.tempFile.deleteOnExit();
-		
-		DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempFile));
-		dos.writeBytes(content);
-		dos.close();
-			
-		return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
-	}
-	
-	@Test
-	public void testWindowsLineEndRemoval() {
-		
-		//Check typical use case -- linux file is correct and it is set up to linuc(\n)
-		this.testRemovingTrailingCR("\n", "\n");
-		
-		//Check typical windows case -- windows file endings and file has windows file endings set up
-		this.testRemovingTrailingCR("\r\n", "\r\n");
-		
-		//Check problematic case windows file -- windows file endings(\r\n) but linux line endings (\n) set up
-		this.testRemovingTrailingCR("\r\n", "\n");
-		
-		//Check problematic case linux file -- linux file endings (\n) but windows file endings set up (\r\n)
-		//Specific setup for windows line endings will expect \r\n because it has to be set up and is not standard.
-	}
-	
-	private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) {
-		File tempFile=null;
-		
-		String fileContent = CsvInputFormatTest.FIRST_PART + lineBreakerInFile + CsvInputFormatTest.SECOND_PART + lineBreakerInFile;
-		
-		try {
-			// create input file
-			tempFile = File.createTempFile("CsvInputFormatTest", "tmp");
-			tempFile.deleteOnExit();
-			tempFile.setWritable(true);
-			
-			OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
-			wrt.write(fileContent);
-			wrt.close();
-			
-			//Instantiate input format
-			CsvInputFormat inputFormat = new CsvInputFormat();
-			
-			Configuration parameters = new Configuration();
-			new CsvInputFormat.ConfigBuilder(null, parameters)
-			.field(StringValue.class, 0).filePath(tempFile.toURI().toString());
-			
-			
-			inputFormat.configure(parameters);
-			
-			inputFormat.setDelimiter(lineBreakerSetup);
-			
-			FileInputSplit[] splits = inputFormat.createInputSplits(1);
-						
-			inputFormat.open(splits[0]);
-			
-			Record record = new Record();
-			
-			Record result = inputFormat.nextRecord(record);
-			
-			assertNotNull("Expecting to not return null", result);
-			
-			
-			
-			assertEquals(FIRST_PART, result.getField(0, StringValue.class).getValue());
-			
-			result = inputFormat.nextRecord(record);
-			
-			assertNotNull("Expecting to not return null", result);
-			assertEquals(SECOND_PART, result.getField(0, StringValue.class).getValue());
-			
-		}
-		catch (Throwable t) {
-			System.err.println("test failed with exception: " + t.getMessage());
-			t.printStackTrace(System.err);
-			fail("Test erroneous");
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java
deleted file mode 100644
index 9eb794f..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.junit.Assert;
-
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvOutputFormatTest {
-
-	protected Configuration config;
-	
-	protected File tempFile;
-	
-	private final CsvOutputFormat format = new CsvOutputFormat();
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Before
-	public void setup() throws IOException {
-		this.tempFile = File.createTempFile("test_output", "tmp");
-		this.format.setOutputFilePath(new Path(tempFile.toURI()));
-		this.format.setWriteMode(WriteMode.OVERWRITE);
-	}
-	
-	@After
-	public void setdown() throws Exception {
-		if (this.format != null) {
-			this.format.close();
-		}
-		if (this.tempFile != null) {
-			this.tempFile.delete();
-		}
-	}
-	
-	@Test
-	public void testConfigure() 
-	{
-		try {
-			Configuration config = new Configuration();
-			
-			// check missing number of fields
-			boolean validConfig = true;
-			try {
-				format.configure(config);
-			} catch(IllegalArgumentException iae) {
-				validConfig = false;
-			} catch(IllegalStateException ise) {
-				validConfig = false;
-			}
-			assertFalse(validConfig);
-			
-			// check missing file parser
-			config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
-			validConfig = true;
-			try {
-				format.configure(config);
-			} catch(IllegalArgumentException iae) {
-				validConfig = false;
-			} catch(IllegalStateException ise) {
-				validConfig = false;
-			}
-			assertFalse(validConfig);
-			
-			// check valid config
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class);
-			validConfig = true;
-			try {
-				format.configure(config);
-			} catch(IllegalArgumentException iae) {
-				validConfig = false;
-			}
-			assertTrue(validConfig);
-			
-			// check invalid file parser config
-			config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 3);
-			validConfig = true;
-			try {
-				format.configure(config);
-			} catch(IllegalArgumentException iae) {
-				validConfig = false;
-			}
-			assertFalse(validConfig);
-			
-			// check valid config
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, StringValue.class);
-			validConfig = true;
-			try {
-				format.configure(config);
-			} catch(IllegalArgumentException iae) {
-				validConfig = false;
-			}
-			assertTrue(validConfig);
-			
-			// check valid config
-			config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-			validConfig = true;
-			try {
-				format.configure(config);
-			} catch(IllegalArgumentException iae) {
-				validConfig = false;
-				System.out.println(iae.getMessage());
-			}
-			assertTrue(validConfig);
-			
-			// check invalid text pos config
-			config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
-			validConfig = true;
-			try {
-				format.configure(config);
-			} catch(IllegalArgumentException iae) {
-				validConfig = false;
-			}
-			assertFalse(validConfig);
-			
-			// check valid text pos config
-			config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 3);
-			config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 9);
-			validConfig = true;
-			try {
-				format.configure(config);
-			} catch(IllegalArgumentException iae) {
-				validConfig = false;
-			}
-			assertTrue(validConfig);
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWriteNoRecPosNoLenient()
-	{
-		try {
-			Configuration config = new Configuration();
-			config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-			config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class);
-			
-			format.configure(config);
-			
-			try {
-				format.open(0, 1);
-			} catch (IOException e) {
-				fail(e.getMessage());
-			}
-			
-			Record r = new Record(2);
-			
-			try {
-				r.setField(0, new StringValue("Hello World"));
-				r.setField(1, new IntValue(42));
-				format.writeRecord(r);
-				
-				r.setField(0, new StringValue("AbCdE"));
-				r.setField(1, new IntValue(13));
-				format.writeRecord(r);
-				
-				format.close();
-				
-				BufferedReader dis = new BufferedReader(new FileReader(tempFile));
-				
-				assertTrue((dis.readLine()+"\n").equals("Hello World|42\n"));
-				assertTrue((dis.readLine()+"\n").equals("AbCdE|13\n"));
-				
-				dis.close();
-				
-			} catch (IOException e) {
-				fail(e.getMessage());
-			}
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWriteNoRecPosNoLenientFail()
-	{
-		try {
-			Configuration config = new Configuration();
-			config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-			config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class);
-			
-			format.configure(config);
-			
-			try {
-				format.open(0, 1);
-			} catch (IOException e) {
-				fail(e.getMessage());
-			}
-			
-			Record r = new Record(2);
-			
-			boolean success = true;
-			
-			try {
-				r.setField(0, new StringValue("Hello World"));
-				r.setField(1, new IntValue(42));
-				format.writeRecord(r);
-				
-				r.setNull(0);
-				r.setField(1, new IntValue(13));
-				format.writeRecord(r);
-				
-				format.close();
-							
-			} catch (IOException e) {
-				success = false;
-			} catch (RuntimeException re) {
-				success = false;
-			}
-			
-			assertFalse(success);
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWriteNoRecPosLenient()
-	{
-		try {
-			Configuration config = new Configuration();
-			config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-			config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class);
-			config.setBoolean(CsvOutputFormat.LENIENT_PARSING, true);
-			
-			format.configure(config);
-			
-			try {
-				format.open(0, 1);
-			} catch (IOException e) {
-				fail(e.getMessage());
-			}
-			
-			Record r = new Record(2);
-			
-			try {
-				r.setField(0, new StringValue("Hello World"));
-				r.setField(1, new IntValue(42));
-				format.writeRecord(r);
-				
-				r.setNull(0);
-				r.setField(1, new IntValue(13));
-				format.writeRecord(r);
-				
-				format.close();
-				
-				BufferedReader dis = new BufferedReader(new FileReader(tempFile));
-				
-				assertTrue((dis.readLine()+"\n").equals("Hello World|42\n"));
-				assertTrue((dis.readLine()+"\n").equals("|13\n"));
-				
-				dis.close();
-				
-			} catch (IOException e) {
-				fail(e.getMessage());
-			}
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWriteRecPosNoLenient()
-	{
-		try {
-			Configuration config = new Configuration();
-			config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-			config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
-			config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, StringValue.class);
-			config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
-			
-			format.configure(config);
-			
-			try {
-				format.open(0, 1);
-			} catch (IOException e) {
-				fail(e.getMessage());
-			}
-			
-			Record r = new Record(2);
-			
-			try {
-				r.setField(0, new StringValue("Hello World"));
-				r.setField(1, new IntValue(42));
-				r.setField(2, new StringValue("Hello User"));
-				format.writeRecord(r);
-				
-				r.setField(0, new StringValue("AbCdE"));
-				r.setField(1, new IntValue(13));
-				r.setField(2, new StringValue("ZyXvW"));
-				format.writeRecord(r);
-				
-				format.close();
-				
-				BufferedReader dis = new BufferedReader(new FileReader(tempFile));
-				
-				assertTrue((dis.readLine()+"\n").equals("Hello User|Hello World\n"));
-				assertTrue((dis.readLine()+"\n").equals("ZyXvW|AbCdE\n"));
-				
-				dis.close();
-				
-			} catch (IOException e) {
-				fail(e.getMessage());
-			}
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWriteRecPosNoLenientFail()
-	{
-		try {
-			Configuration config = new Configuration();
-			config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-			config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
-			config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, StringValue.class);
-			config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
-			
-			format.configure(config);
-			
-			try {
-				format.open(0, 1);
-			} catch (IOException e) {
-				fail(e.getMessage());
-			}
-			
-			Record r = new Record(2);
-			
-			boolean success = true;
-			
-			try {
-				r.setField(0, new StringValue("Hello World"));
-				r.setField(1, new IntValue(42));
-				r.setField(2, new StringValue("Hello User"));
-				format.writeRecord(r);
-	
-				r = new Record();
-				
-				r.setField(0, new StringValue("AbCdE"));
-				r.setField(1, new IntValue(13));
-				format.writeRecord(r);
-				
-				format.close();
-				
-			} catch (IOException e) {
-				success = false;
-			} catch (RuntimeException re) {
-				success = false;
-			}
-			
-			assertFalse(success);
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWriteRecPosLenient()
-	{
-		try {
-			Configuration config = new Configuration();
-			config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-			config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
-			config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2);
-			config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, StringValue.class);
-			config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
-			config.setBoolean(CsvOutputFormat.LENIENT_PARSING, true);
-			
-			format.configure(config);
-			
-			try {
-				format.open(0, 1);
-			} catch (IOException e) {
-				fail(e.getMessage());
-			}
-			
-			Record r = new Record(2);
-			
-			try {
-				r.setField(0, new StringValue("Hello World"));
-				r.setField(1, new IntValue(42));
-				r.setField(2, new StringValue("Hello User"));
-				format.writeRecord(r);
-	
-				r = new Record();
-				
-				r.setField(0, new StringValue("AbCdE"));
-				r.setField(1, new IntValue(13));
-				format.writeRecord(r);
-				
-				format.close();
-				
-				BufferedReader dis = new BufferedReader(new FileReader(tempFile));
-				
-				assertTrue((dis.readLine()+"\n").equals("Hello User|Hello World\n"));
-				assertTrue((dis.readLine()+"\n").equals("|AbCdE\n"));
-				
-				dis.close();
-				
-			} catch (IOException e) {
-				fail(e.getMessage());
-			}
-		}
-		catch (Exception ex) {
-			Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-	}
-		
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
deleted file mode 100644
index 4aec38e..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.ExternalProcessFixedLengthInputFormat;
-import org.apache.flink.api.java.record.io.ExternalProcessInputFormat;
-import org.apache.flink.api.java.record.io.ExternalProcessInputSplit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ExternalProcessFixedLengthInputFormatTest {
-
-private ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> format;
-	
-	private final String neverEndingCommand = "cat /dev/urandom";
-	private final String thousandRecordsCommand = "dd if=/dev/zero bs=8 count=1000";
-	private final String incompleteRecordsCommand = "dd if=/dev/zero bs=7 count=2";
-	private final String failingCommand = "ls /I/do/not/exist";
-	
-	@Before
-	public void prepare() {
-		format = new MyExternalProcessTestInputFormat();
-	}
-	
-	@Test
-	public void testOpen() {
-		
-		if(OperatingSystem.isWindows()) {
-			return;
-		}
-		
-		Configuration config = new Configuration();
-		config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-		ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
-				
-		boolean processDestroyed = false;
-		try {
-			format.configure(config);
-			format.open(split);
-			
-			String[] cmd = {"/bin/sh","-c","ps aux | grep -v grep | grep \"cat /dev/urandom\" | wc -l"};
-			
-			byte[] wcOut = new byte[128];
-			Process p = Runtime.getRuntime().exec(cmd);
-			p.getInputStream().read(wcOut);
-			int pCnt = Integer.parseInt(new String(wcOut).trim());
-			Assert.assertTrue(pCnt > 0);
-
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) {
-				processDestroyed = true;
-			}
-		} finally {
-			Assert.assertTrue(processDestroyed);
-		}
-	}
-	
-	@Test
-	public void testCheckExitCode() {
-		
-		if(OperatingSystem.isWindows()) {
-			return;
-		}
-		
-		Configuration config = new Configuration();
-		config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-		ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, failingCommand);
-		
-		format.configure(config);
-		boolean invalidExitCode = false;
-		try {
-			format.open(split);
-			format.waitForProcessToFinish();
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (InterruptedException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) {
-				invalidExitCode = true;	
-			}
-		}
-		Assert.assertTrue(invalidExitCode);
-		
-		invalidExitCode = false;
-		config.setString(ExternalProcessInputFormat.ALLOWEDEXITCODES_PARAMETER_KEY,"0,1,2");
-		format.configure(config);
-		try {
-			format.open(split);
-			// wait for process to start...
-			Thread.sleep(100);
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (InterruptedException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) {
-				invalidExitCode = true;	
-			}
-		}
-		Assert.assertTrue(!invalidExitCode);
-		
-	}
-	
-	@Test
-	public void testUserCodeTermination() {
-		
-		if(OperatingSystem.isWindows()) {
-			return;
-		}
-		
-		Configuration config = new Configuration();
-		config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-		config.setInteger(MyExternalProcessTestInputFormat.FAILCOUNT_PARAMETER_KEY, 100);
-		ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
-		Record record = new Record();
-				
-		boolean userException = false;
-		boolean processDestroyed = false;
-		try {
-			format.configure(config);
-			format.open(split);
-			while(!format.reachedEnd()) {
-				try {
-					format.nextRecord(record);
-				} catch(RuntimeException re) {
-					userException = true;
-					break;
-				}
-			}
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) {
-				processDestroyed = true;
-			}
-		} finally {
-			Assert.assertTrue(userException && processDestroyed);
-		}
-	}
-	
-	@Test
-	public void testReadStream() {
-		
-		if(OperatingSystem.isWindows()) {
-			return;
-		}
-		
-		Configuration config = new Configuration();
-		config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-		ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.thousandRecordsCommand);
-		Record record = new Record();
-
-		int cnt = 0;
-		try {
-			format.configure(config);
-			format.open(split);
-			while(!format.reachedEnd()) {
-				if (format.nextRecord(record) != null) {
-					cnt++;
-				}
-			}
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			Assert.fail(e.getMessage());
-		}
-		Assert.assertTrue(cnt == 1000);
-	}
-	
-	@Test
-	public void testReadInvalidStream() {
-		
-		if(OperatingSystem.isWindows()) {
-			return;
-		}
-		
-		Configuration config = new Configuration();
-		config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-		ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.incompleteRecordsCommand);
-		Record record = new Record();
-
-		boolean incompleteRecordDetected = false;
-		@SuppressWarnings("unused")
-		int cnt = 0;
-		try {
-			format.configure(config);
-			format.open(split);
-			while(!format.reachedEnd()) {
-				if (format.nextRecord(record) != null) {
-					cnt++;
-				}
-			}
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			if(e.getMessage().equals("External process produced incomplete record")) {
-				incompleteRecordDetected = true;
-			} else {
-				Assert.fail(e.getMessage());
-			}
-		}
-		Assert.assertTrue(incompleteRecordDetected);
-	}
-	
-	private final class MyExternalProcessTestInputFormat extends ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> {
-		private static final long serialVersionUID = 1L;
-
-		public static final String FAILCOUNT_PARAMETER_KEY = "test.failingCount";
-		
-		private long cnt = 0;
-		private int failCnt;
-		
-		@Override
-		public void configure(Configuration parameters) {
-			super.configure(parameters);
-			failCnt = parameters.getInteger(FAILCOUNT_PARAMETER_KEY, Integer.MAX_VALUE);
-		}
-		
-		@Override
-		public boolean readBytes(Record record, byte[] bytes, int startPos) {
-
-			if(cnt == failCnt) {
-				throw new RuntimeException("This is a test exception!");
-			}
-			
-			int v1 = 0;
-			v1 = v1        | (0xFF & bytes[startPos+0]);
-			v1 = (v1 << 8) | (0xFF & bytes[startPos+1]);
-			v1 = (v1 << 8) | (0xFF & bytes[startPos+2]);
-			v1 = (v1 << 8) | (0xFF & bytes[startPos+3]);
-			
-			int v2 = 0;
-			v2 = v2        | (0xFF & bytes[startPos+4]);
-			v2 = (v2 << 8) | (0xFF & bytes[startPos+5]);
-			v2 = (v2 << 8) | (0xFF & bytes[startPos+6]);
-			v2 = (v2 << 8) | (0xFF & bytes[startPos+7]);
-			
-			record.setField(0,new IntValue(v1));
-			record.setField(1,new IntValue(v2));
-			
-			cnt++;
-			
-			return true;
-		}
-
-		@Override
-		public ExternalProcessInputSplit[] createInputSplits(int minNumSplits)
-				throws IOException {
-			return null;
-		}
-
-		@Override
-		public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
-			return new DefaultInputSplitAssigner(splits);
-		}
-
-		@Override
-		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-			return null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
deleted file mode 100644
index 6b8cacb..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.ExternalProcessInputFormat;
-import org.apache.flink.api.java.record.io.ExternalProcessInputSplit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ExternalProcessInputFormatTest {
-
-	private ExternalProcessInputFormat<ExternalProcessInputSplit> format;
-	
-	private final String neverEndingCommand = "cat /dev/urandom";
-	private final String thousandRecordsCommand = "dd if=/dev/zero bs=8 count=1000";
-	private final String failingCommand = "ls /I/do/not/exist";
-	
-	@Before
-	public void prepare() {
-		format = new MyExternalProcessTestInputFormat();
-	}
-	
-	@Test
-	public void testOpen() {
-		
-		if(OperatingSystem.isWindows()) {
-			return;
-		}
-		
-		Configuration config = new Configuration();
-		ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
-		
-		boolean processDestroyed = false;
-		try {
-			format.configure(config);
-			format.open(split);
-			
-			String[] cmd = {"/bin/sh","-c","ps aux | grep -v grep | grep \"cat /dev/urandom\" | wc -l"};
-			
-			byte[] wcOut = new byte[128];
-			Process p = Runtime.getRuntime().exec(cmd);
-			p.getInputStream().read(wcOut);
-			int pCnt = Integer.parseInt(new String(wcOut).trim());
-			Assert.assertTrue(pCnt > 0);
-
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) {
-				processDestroyed = true;
-			}
-		} finally {
-			Assert.assertTrue(processDestroyed);
-		}
-	}
-	
-	@Test
-	public void testCheckExitCode() {
-		
-		if(OperatingSystem.isWindows()) {
-			return;
-		}
-		
-		Configuration config = new Configuration();
-		ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, failingCommand);
-		
-		format.configure(config);
-		boolean invalidExitCode = false;
-		try {
-			format.open(split);
-			format.waitForProcessToFinish();
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (InterruptedException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) {
-				invalidExitCode = true;	
-			}
-		}
-		Assert.assertTrue(invalidExitCode);
-		
-		invalidExitCode = false;
-		config.setString(ExternalProcessInputFormat.ALLOWEDEXITCODES_PARAMETER_KEY,"0,1,2");
-		format.configure(config);
-		try {
-			format.open(split);
-			format.waitForProcessToFinish();
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (InterruptedException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) {
-				invalidExitCode = true;	
-			}
-		}
-		Assert.assertTrue(!invalidExitCode);
-		
-	}
-	
-	@Test
-	public void testUserCodeTermination() {
-		
-		if(OperatingSystem.isWindows()) {
-			return;
-		}
-		
-		Configuration config = new Configuration();
-		config.setInteger(MyExternalProcessTestInputFormat.FAILCOUNT_PARAMETER_KEY, 100);
-		ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
-		Record record = new Record();
-				
-		boolean userException = false;
-		boolean processDestroyed = false;
-		try {
-			format.configure(config);
-			format.open(split);
-			while(!format.reachedEnd()) {
-				try {
-					format.nextRecord(record);
-				} catch(RuntimeException re) {
-					userException = true;
-					break;
-				}
-			}
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) {
-				processDestroyed = true;
-			}
-		} finally {
-			Assert.assertTrue(userException && processDestroyed);
-		}
-	}
-	
-	@Test
-	public void testReadStream() {
-		
-		if(OperatingSystem.isWindows()) {
-			return;
-		}
-		
-		Configuration config = new Configuration();
-		ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.thousandRecordsCommand);
-		Record record = new Record();
-
-		int cnt = 0;
-		try {
-			format.configure(config);
-			format.open(split);
-			while(!format.reachedEnd()) {
-				if (format.nextRecord(record) != null) {
-					cnt++;
-				}
-			}
-			format.close();
-		} catch (IOException e) {
-			Assert.fail();
-		} catch (RuntimeException e) {
-			Assert.fail(e.getMessage());
-		}
-		Assert.assertTrue("Expected read count was 1000, actual read count was "+cnt, cnt == 1000);
-	}
-	
-	private final class MyExternalProcessTestInputFormat extends ExternalProcessInputFormat<ExternalProcessInputSplit> {
-		private static final long serialVersionUID = 1L;
-
-		public static final String FAILCOUNT_PARAMETER_KEY = "test.failingCount";
-		
-		private byte[] buf = new byte[8];
-		
-		private long cnt = 0;
-		private int failCnt;
-		private boolean end;
-		
-		@Override
-		public void configure(Configuration parameters) {
-			super.configure(parameters);
-			failCnt = parameters.getInteger(FAILCOUNT_PARAMETER_KEY, Integer.MAX_VALUE);
-		}
-		
-		@Override
-		public void open(GenericInputSplit split) throws IOException {
-			super.open(split);
-			
-			this.end = false;
-		}
-		
-		@Override
-		public ExternalProcessInputSplit[] createInputSplits(int minNumSplits) {
-			return null;
-		}
-
-		@Override
-		public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
-			return new DefaultInputSplitAssigner(splits);
-		}
-		
-		@Override
-		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-			return null;
-		}
-
-		@Override
-		public Record nextRecord(Record reuse) throws IOException {
-			
-			if(cnt > failCnt) {
-				throw new RuntimeException("This is a test exception!");
-			}
-			
-			int totalReadCnt = 0;
-			
-			do {
-				int readCnt = super.extProcOutStream.read(buf, totalReadCnt, buf.length-totalReadCnt);
-				
-				if(readCnt == -1) {
-					this.end = true;
-					return null;
-				} else {
-					totalReadCnt += readCnt;
-				}
-				
-			} while(totalReadCnt != 8);
-				
-			int v1 = 0;
-			v1 = v1        | (0xFF & buf[0]);
-			v1 = (v1 << 8) | (0xFF & buf[1]);
-			v1 = (v1 << 8) | (0xFF & buf[2]);
-			v1 = (v1 << 8) | (0xFF & buf[3]);
-			
-			int v2 = 0;
-			v2 = v2        | (0xFF & buf[4]);
-			v2 = (v2 << 8) | (0xFF & buf[5]);
-			v2 = (v2 << 8) | (0xFF & buf[6]);
-			v2 = (v2 << 8) | (0xFF & buf[7]);
-			
-			reuse.setField(0,new IntValue(v1));
-			reuse.setField(1,new IntValue(v2));
-			
-			this.cnt++;
-			
-			return reuse;
-		}
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			return this.end;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
deleted file mode 100644
index 3d441e6..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FixedLenghtInputFormatTest {
-
-	protected Configuration config;
-	
-	protected File tempFile;
-	
-	private final FixedLengthInputFormat format = new MyFixedLengthInputFormat();
-	
-	// --------------------------------------------------------------------------------------------
-
-	@Before
-	public void setup() {
-		format.setFilePath("file:///some/file/that/will/not/be/read");
-	}
-	
-	@After
-	public void setdown() throws Exception {
-		if (this.format != null) {
-			this.format.close();
-		}
-		if (this.tempFile != null) {
-			this.tempFile.delete();
-		}
-	}
-
-	@Test
-	public void testOpen() throws IOException {
-		final int[] fileContent = {1,2,3,4,5,6,7,8};
-		final FileInputSplit split = createTempFile(fileContent);	
-	
-		final Configuration parameters = new Configuration();
-		parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-		
-		format.configure(parameters);
-		format.open(split);
-		assertEquals(0, format.getSplitStart());
-		assertEquals(0, format.getReadBufferSize() % 8);
-		format.close();
-
-		parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 13);
-		format.configure(parameters);
-		format.close();
-		format.open(split);
-		assertEquals(0, format.getReadBufferSize() % 13);
-		format.close();
-		
-		parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 27);
-		format.configure(parameters);
-		format.close();
-		format.open(split);
-		assertEquals(0, format.getReadBufferSize() % 27);
-		format.close();
-		
-	}
-	
-	@Test
-	public void testRead() throws IOException {
-		final int[] fileContent = {1,2,3,4,5,6,7,8};
-		final FileInputSplit split = createTempFile(fileContent);
-		
-		final Configuration parameters = new Configuration();
-		
-		parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-		
-		format.configure(parameters);
-		format.open(split);
-		
-		Record record = new Record();
-		
-		assertNotNull(format.nextRecord(record));
-		assertEquals(1, record.getField(0, IntValue.class).getValue());
-		assertEquals(2, record.getField(1, IntValue.class).getValue());	
-		
-		assertNotNull(format.nextRecord(record));
-		assertEquals(3, record.getField(0, IntValue.class).getValue());
-		assertEquals(4, record.getField(1, IntValue.class).getValue());
-		
-		assertNotNull(format.nextRecord(record));
-		assertEquals(5, record.getField(0, IntValue.class).getValue());
-		assertEquals(6, record.getField(1, IntValue.class).getValue());
-		
-		assertNotNull(format.nextRecord(record));
-		assertEquals(7, record.getField(0, IntValue.class).getValue());
-		assertEquals(8, record.getField(1, IntValue.class).getValue());
-		
-		assertNull(format.nextRecord(record));
-		assertTrue(format.reachedEnd());
-	}
-	
-	
-	@Test
-	public void testReadFail() throws IOException {
-		final int[] fileContent = {1,2,3,4,5,6,7,8,9};
-		final FileInputSplit split = createTempFile(fileContent);
-		
-		final Configuration parameters = new Configuration();
-		parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-		
-		format.configure(parameters);
-		format.open(split);
-		
-		Record record = new Record();
-
-		try {
-			assertNotNull(format.nextRecord(record));
-			assertEquals(1, record.getField(0, IntValue.class).getValue());
-			assertEquals(2, record.getField(1, IntValue.class).getValue());
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(3, record.getField(0, IntValue.class).getValue());
-			assertEquals(4, record.getField(1, IntValue.class).getValue());
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(5, record.getField(0, IntValue.class).getValue());
-			assertEquals(6, record.getField(1, IntValue.class).getValue());
-			
-			assertNotNull(format.nextRecord(record));
-			assertEquals(7, record.getField(0, IntValue.class).getValue());
-			assertEquals(8, record.getField(1, IntValue.class).getValue());
-			
-			assertNull(format.nextRecord(record));
-		} catch(IOException ioe) {
-			assertTrue(ioe.getMessage().equals("Unable to read full record"));
-		}
-	}
-	
-	
-	private FileInputSplit createTempFile(int[] contents) throws IOException {
-		this.tempFile = File.createTempFile("test_contents", "tmp");
-		this.tempFile.deleteOnExit();
-		
-		DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempFile));
-		
-		for(int i : contents) {
-			dos.writeInt(i);
-		}
-		
-		dos.close();
-			
-		return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
-	}
-		
-	
-	private final class MyFixedLengthInputFormat extends FixedLengthInputFormat {
-		private static final long serialVersionUID = 1L;
-
-		IntValue p1 = new IntValue();
-		IntValue p2 = new IntValue();
-		
-		@Override
-		public boolean readBytes(Record target, byte[] buffer, int startPos) {
-			int v1 = 0;
-			v1 = (v1 | buffer[startPos+0]) << 8;
-			v1 = (v1 | buffer[startPos+1]) << 8;
-			v1 = (v1 | buffer[startPos+2]) << 8;
-			v1 = (v1 | buffer[startPos+3]);
-			p1.setValue(v1);
-			
-			int v2 = 0;
-			v2 = (v2 | buffer[startPos+4]) << 8;
-			v2 = (v2 | buffer[startPos+5]) << 8;
-			v2 = (v2 | buffer[startPos+6]) << 8;
-			v2 = (v2 | buffer[startPos+7]);
-			p2.setValue(v2);
-			
-			target.setField(0, p1);
-			target.setField(1, p2);
-			
-			return true;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
deleted file mode 100644
index 8ca19cf..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.OutputStreamWriter;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-
-public class TextInputFormatTest {
-	/**
-	 * The TextInputFormat seems to fail reading more than one record. I guess its
-	 * an off by one error.
-	 * 
-	 * The easiest workaround is to setParameter(TextInputFormat.CHARSET_NAME, "ASCII");
-	 */
-	@Test
-	public void testPositionBug() {
-		final String FIRST = "First line";
-		final String SECOND = "Second line";
-		
-		try {
-			// create input file
-			File tempFile = File.createTempFile("TextInputFormatTest", "tmp");
-			tempFile.deleteOnExit();
-			tempFile.setWritable(true);
-			
-			FileWriter writer = new FileWriter(tempFile);
-			writer.append(FIRST).append('\n');
-			writer.append(SECOND).append('\n');
-			writer.close();
-			
-			TextInputFormat inputFormat = new TextInputFormat();
-			inputFormat.setFilePath(tempFile.toURI().toString());
-			
-			Configuration parameters = new Configuration(); 
-			inputFormat.configure(parameters);
-			
-			FileInputSplit[] splits = inputFormat.createInputSplits(1);
-			assertTrue("expected at least one input split", splits.length >= 1);
-			
-			inputFormat.open(splits[0]);
-			
-			Record r = new Record();
-			assertNotNull("Expecting first record here", inputFormat.nextRecord(r));
-			assertEquals(FIRST, r.getField(0, StringValue.class).getValue());
-			
-			assertNotNull("Expecting second record here",inputFormat.nextRecord(r ));
-			assertEquals(SECOND, r.getField(0, StringValue.class).getValue());
-			
-			assertNull("The input file is over", inputFormat.nextRecord(r));
-		}
-		catch (Throwable t) {
-			System.err.println("test failed with exception: " + t.getMessage());
-			t.printStackTrace(System.err);
-			fail("Test erroneous");
-		}
-	}
-
-	
-	/**
-	 * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should be removed 
-	 */
-	@Test
-	public void testRemovingTrailingCR() {
-		
-		testRemovingTrailingCR("\n","\n");
-		testRemovingTrailingCR("\r\n","\n");
-		
-		testRemovingTrailingCR("|","|");
-		testRemovingTrailingCR("|","\n");
-
-	}
-
-	private void testRemovingTrailingCR(String lineBreaker,String delimiter) {
-		File tempFile;
-		
-		String FIRST = "First line";
-		String SECOND = "Second line";
-		String CONTENT = FIRST + lineBreaker + SECOND + lineBreaker;
-		
-		try {
-			// create input file
-			tempFile = File.createTempFile("TextInputFormatTest", "tmp");
-			tempFile.deleteOnExit();
-			tempFile.setWritable(true);
-			
-			OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
-			wrt.write(CONTENT);
-			wrt.close();
-			
-			TextInputFormat inputFormat = new TextInputFormat();
-			inputFormat.setFilePath(tempFile.toURI().toString());
-			
-			Configuration parameters = new Configuration(); 
-			inputFormat.configure(parameters);
-			
-			inputFormat.setDelimiter(delimiter);
-			
-			FileInputSplit[] splits = inputFormat.createInputSplits(1);
-						
-			inputFormat.open(splits[0]);
-			
-			Record r = new Record();
-			if ( (delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n") ) )
-					|| (lineBreaker.equals(delimiter)) ){
-
-				assertNotNull("Expecting first record here", inputFormat.nextRecord(r));
-				assertEquals(FIRST, r.getField(0, StringValue.class).getValue());
-				
-				assertNotNull("Expecting second record here",inputFormat.nextRecord(r ));
-				assertEquals(SECOND, r.getField(0, StringValue.class).getValue());
-				
-				assertNull("The input file is over", inputFormat.nextRecord(r));
-			} else {
-				assertNotNull("Expecting first record here", inputFormat.nextRecord(r));
-				assertEquals(CONTENT, r.getField(0, StringValue.class).getValue());
-			}
-			
-			
-		}
-		catch (Throwable t) {
-			System.err.println("test failed with exception: " + t.getMessage());
-			t.printStackTrace(System.err);
-			fail("Test erroneous");
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
index 5ff9eaf..78d61d1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
@@ -169,8 +169,7 @@ public abstract class CostEstimator {
 		switch (n.getDriverStrategy()) {
 		case NONE:
 		case UNARY_NO_OP:
-		case BINARY_NO_OP:	
-		case COLLECTOR_MAP:
+		case BINARY_NO_OP:
 		case MAP:
 		case MAP_PARTITION:
 		case FLAT_MAP:

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
deleted file mode 100644
index 9c1bcd3..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.CollectorMapDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>Map</i> operator node.
- */
-public class CollectorMapNode extends SingleInputNode {
-	
-	private final List<OperatorDescriptorSingle> possibleProperties;
-
-	
-	public CollectorMapNode(SingleInputOperator<?, ?, ?> operator) {
-		super(operator);
-		
-		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new CollectorMapDescriptor());
-	}
-
-	@Override
-	public String getOperatorName() {
-		return "Map";
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-
-	/**
-	 * Computes the estimates for the Map operator. Map takes one value and transforms it into another value.
-	 * The cardinality consequently stays the same.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
deleted file mode 100644
index bcd4d73..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class CollectorMapDescriptor extends OperatorDescriptorSingle {
-
-	@Override
-	public DriverStrategy getStrategy() {
-		return DriverStrategy.COLLECTOR_MAP;
-	}
-
-	@Override
-	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.COLLECTOR_MAP);
-	}
-
-	@Override
-	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
-		RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-		rgp.setAnyDistribution();
-		return Collections.singletonList(rgp);
-	}
-
-	@Override
-	protected List<RequestedLocalProperties> createPossibleLocalProperties() {
-		return Collections.singletonList(new RequestedLocalProperties());
-	}
-	
-	@Override
-	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
-		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
-				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
-		{
-			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-		}
-		gProps.clearUniqueFieldCombinations();
-		return gProps;
-	}
-	
-	@Override
-	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		return lProps.clearUniqueFieldSets();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index aaabac5..fc5eb21 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -418,7 +418,6 @@ public class PlanJSONDumpGenerator {
 				locString = "No-Op";
 				break;
 				
-			case COLLECTOR_MAP:
 			case MAP:
 				locString = "Map";
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
index d5ddf4d..1125c29 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
@@ -57,7 +57,6 @@ public class JsonMapper {
 			case UNARY_NO_OP:
 				return "No-Op";
 
-			case COLLECTOR_MAP:
 			case MAP:
 				return "Map";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
index bcdee14..3f3eae1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -47,7 +47,6 @@ import org.apache.flink.optimizer.dag.BulkIterationNode;
 import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
 import org.apache.flink.optimizer.dag.CoGroupNode;
 import org.apache.flink.optimizer.dag.CoGroupRawNode;
-import org.apache.flink.optimizer.dag.CollectorMapNode;
 import org.apache.flink.optimizer.dag.CrossNode;
 import org.apache.flink.optimizer.dag.DagConnection;
 import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -144,9 +143,6 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
 		else if (c instanceof MapPartitionOperatorBase) {
 			n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
 		}
-		else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
-			n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
-		}
 		else if (c instanceof FlatMapOperatorBase) {
 			n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
deleted file mode 100644
index 60bc798..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Map task which is executed by a Task Manager. The task has a single
- * input and one or multiple outputs. It is provided with a MapFunction
- * implementation.
- * <p>
- * The MapTask creates an iterator over all key-value pairs of its input and hands that to the <code>map()</code> method
- * of the MapFunction.
- * 
- * @see GenericCollectorMap
- * 
- * @param <IT> The mapper's input data type.
- * @param <OT> The mapper's output data type.
- */
-@SuppressWarnings("deprecation")
-public class CollectorMapDriver<IT, OT> implements Driver<GenericCollectorMap<IT, OT>, OT> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(CollectorMapDriver.class);
-
-
-	private TaskContext<GenericCollectorMap<IT, OT>, OT> taskContext;
-	
-	private volatile boolean running;
-
-	private boolean objectReuseEnabled = false;
-
-	@Override
-	public void setup(TaskContext<GenericCollectorMap<IT, OT>, OT> context) {
-		this.taskContext = context;
-		this.running = true;
-	}
-
-	@Override
-	public int getNumberOfInputs() {
-		return 1;
-	}
-
-	@Override
-	public Class<GenericCollectorMap<IT, OT>> getStubType() {
-		@SuppressWarnings("unchecked")
-		final Class<GenericCollectorMap<IT, OT>> clazz = (Class<GenericCollectorMap<IT, OT>>) (Class<?>) GenericCollectorMap.class;
-		return clazz;
-	}
-
-	@Override
-	public int getNumberOfDriverComparators() {
-		return 0;
-	}
-
-	@Override
-	public void prepare() {
-		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
-		this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("CollectorMapDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
-		}
-	}
-
-	@Override
-	public void run() throws Exception {
-		// cache references on the stack
-		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
-		final GenericCollectorMap<IT, OT> stub = this.taskContext.getStub();
-		final Collector<OT> output = this.taskContext.getOutputCollector();
-
-		if (objectReuseEnabled) {
-			IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
-
-
-			while (this.running && ((record = input.next(record)) != null)) {
-				stub.map(record, output);
-			}
-		} else {
-			IT record;
-			while (this.running && ((record = input.next()) != null)) {
-				stub.map(record, output);
-			}
-		}
-	}
-
-	@Override
-	public void cleanup() {
-		// mappers need no cleanup, since no strategies are used.
-	}
-
-	@Override
-	public void cancel() {
-		this.running = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index b069f12..12da126 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -23,7 +23,6 @@ import static org.apache.flink.runtime.operators.DamBehavior.MATERIALIZING;
 import static org.apache.flink.runtime.operators.DamBehavior.PIPELINED;
 import org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver;
 
-import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedMapDriver;
@@ -40,8 +39,6 @@ public enum DriverStrategy {
 	// a binary no-op operator. non implementation available
 	BINARY_NO_OP(null, null, PIPELINED, PIPELINED, 0),
 
-	// the old mapper
-	COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, 0),
 	// the proper mapper
 	MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0),
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
deleted file mode 100644
index 8900ed7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.chaining;
-
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.BatchTask;
-
-@SuppressWarnings("deprecation")
-public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
-
-	private GenericCollectorMap<IT, OT> mapper;
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void setup(AbstractInvokable parent) {
-		@SuppressWarnings("unchecked")
-		final GenericCollectorMap<IT, OT> mapper =
-			BatchTask.instantiateUserCode(this.config, userCodeClassLoader, GenericCollectorMap.class);
-		this.mapper = mapper;
-		mapper.setRuntimeContext(getUdfRuntimeContext());
-	}
-
-	@Override
-	public void openTask() throws Exception {
-		Configuration stubConfig = this.config.getStubParameters();
-		BatchTask.openUserCode(this.mapper, stubConfig);
-	}
-
-	@Override
-	public void closeTask() throws Exception {
-		BatchTask.closeUserCode(this.mapper);
-	}
-
-	@Override
-	public void cancelTask() {
-		try {
-			this.mapper.close();
-		} catch (Throwable t) {
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public RichFunction getStub() {
-		return this.mapper;
-	}
-
-	public String getTaskName() {
-		return this.taskName;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void collect(IT record) {
-		try {
-			this.mapper.map(record, this.outputCollector);
-		} catch (Exception ex) {
-			throw new ExceptionInChainedStubException(this.taskName, ex);
-		}
-	}
-
-	@Override
-	public void close() {
-		this.outputCollector.close();
-	}
-}