You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/27 11:09:12 UTC

[07/20] flink git commit: [FLINK-4048] Remove Hadoop from DataSet API

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
new file mode 100644
index 0000000..52a6bed
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase
+import org.apache.hadoop.mapred.{InputFormat, JobConf}
+
+@Public
+class HadoopInputFormat[K, V](
+    mapredInputFormat: InputFormat[K, V],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    job: JobConf)
+  extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) {
+
+  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]) = {
+    this(mapredInputFormat, keyClass, valueClass, new JobConf)
+  }
+
+  def nextRecord(reuse: (K, V)): (K, V) = {
+    if (!fetched) {
+      fetchNext()
+    }
+    if (!hasNext) {
+      null
+    } else {
+      fetched = false
+      (key, value)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
new file mode 100644
index 0000000..b786651
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
+import org.apache.hadoop.mapred.{JobConf, OutputCommitter, OutputFormat}
+
+@Public
+class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf)
+  extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
+
+  def this(
+      mapredOutputFormat: OutputFormat[K, V],
+      outputCommitterClass: Class[OutputCommitter],
+      job: JobConf) {
+
+    this(mapredOutputFormat, job)
+    this.getJobConf.setOutputCommitter(outputCommitterClass)
+  }
+
+  def writeRecord(record: (K, V)) {
+    this.recordWriter.write(record._1, record._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
new file mode 100644
index 0000000..8efb53d
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase
+import org.apache.hadoop.mapreduce.{InputFormat, Job}
+
+@Public
+class HadoopInputFormat[K, V](
+    mapredInputFormat: InputFormat[K, V],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    job: Job)
+  extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) {
+
+  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]) = {
+    this(mapredInputFormat, keyClass, valueClass, Job.getInstance())
+  }
+
+  def nextRecord(reuse: (K, V)): (K, V) = {
+    if (!fetched) {
+      fetchNext()
+    }
+    if (!hasNext) {
+      null
+    } else {
+      fetched = false
+      (recordReader.getCurrentKey, recordReader.getCurrentValue)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
new file mode 100644
index 0000000..8095304
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase
+import org.apache.hadoop.mapreduce.{Job, OutputFormat}
+
+@Public
+class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: Job)
+  extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
+
+  def writeRecord(record: (K, V)) {
+    this.recordWriter.write(record._1, record._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
new file mode 100644
index 0000000..bfee273
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link HadoopInputFormat}.
+ */
+public class HadoopInputFormatTest {
+
+	@Test
+	public void testConfigureWithConfigurableInstance() {
+		ConfigurableDummyInputFormat inputFormat = mock(ConfigurableDummyInputFormat.class);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+		verify(inputFormat, times(1)).setConf(any(JobConf.class));
+
+		hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration());
+		verify(inputFormat, times(2)).setConf(any(JobConf.class));
+	}
+
+	@Test
+	public void testConfigureWithJobConfigurableInstance() {
+		JobConfigurableDummyInputFormat inputFormat = mock(JobConfigurableDummyInputFormat.class);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+		verify(inputFormat, times(1)).configure(any(JobConf.class));
+
+		hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration());
+		verify(inputFormat, times(2)).configure(any(JobConf.class));
+	}
+
+	@Test
+	public void testOpenClose() throws Exception {
+		DummyRecordReader recordReader = mock(DummyRecordReader.class);
+		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+		when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+		hadoopInputFormat.open(getHadoopInputSplit());
+
+		verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class));
+		verify(recordReader, times(1)).createKey();
+		verify(recordReader, times(1)).createValue();
+
+		assertThat(hadoopInputFormat.fetched, is(false));
+
+		hadoopInputFormat.close();
+		verify(recordReader, times(1)).close();
+	}
+
+	@Test
+	public void testOpenWithConfigurableReader() throws Exception {
+		ConfigurableDummyRecordReader recordReader = mock(ConfigurableDummyRecordReader.class);
+		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+		when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+		hadoopInputFormat.open(getHadoopInputSplit());
+
+		verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class));
+		verify(recordReader, times(1)).setConf(any(JobConf.class));
+		verify(recordReader, times(1)).createKey();
+		verify(recordReader, times(1)).createValue();
+
+		assertThat(hadoopInputFormat.fetched, is(false));
+
+	}
+
+	@Test
+	public void testCreateInputSplits() throws Exception {
+
+		FileSplit[] result = new FileSplit[1];
+		result[0] = getFileSplit();
+		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+		when(inputFormat.getSplits(any(JobConf.class), anyInt())).thenReturn(result);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+		hadoopInputFormat.createInputSplits(2);
+
+		verify(inputFormat, times(1)).getSplits(any(JobConf.class), anyInt());
+	}
+
+	@Test
+	public void testReachedEndWithElementsRemaining() throws IOException {
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, new JobConf());
+		hadoopInputFormat.fetched = true;
+		hadoopInputFormat.hasNext = true;
+
+		assertThat(hadoopInputFormat.reachedEnd(), is(false));
+	}
+
+	@Test
+	public void testReachedEndWithNoElementsRemaining() throws IOException {
+		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, new JobConf());
+		hadoopInputFormat.fetched = true;
+		hadoopInputFormat.hasNext = false;
+
+		assertThat(hadoopInputFormat.reachedEnd(), is(true));
+	}
+
+	@Test
+	public void testFetchNext() throws IOException {
+		DummyRecordReader recordReader = mock(DummyRecordReader.class);
+		when(recordReader.next(anyString(), anyLong())).thenReturn(true);
+
+		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+		when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+		hadoopInputFormat.open(getHadoopInputSplit());
+		hadoopInputFormat.fetchNext();
+
+		verify(recordReader, times(1)).next(anyString(), anyLong());
+		assertThat(hadoopInputFormat.hasNext, is(true));
+		assertThat(hadoopInputFormat.fetched, is(true));
+	}
+
+	@Test
+	public void checkTypeInformation() throws Exception {
+		HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<>(
+				new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, new JobConf());
+
+		TypeInformation<Tuple2<Void, Long>> tupleType = hadoopInputFormat.getProducedType();
+		TypeInformation<Tuple2<Void, Long>> expectedType = new TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+
+		assertThat(tupleType.isTupleType(), is(true));
+		assertThat(tupleType, is(equalTo(expectedType)));
+	}
+
+	@Test
+	public void testCloseWithoutOpen() throws Exception {
+		HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<>(
+			new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, new JobConf());
+		hadoopInputFormat.close();
+	}
+
+	private HadoopInputSplit getHadoopInputSplit() {
+		return new HadoopInputSplit(1, getFileSplit(), new JobConf());
+	}
+
+	private FileSplit getFileSplit() {
+		return new FileSplit(new Path("path"), 1, 2, new String[]{});
+	}
+
+	private class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
+
+		public DummyVoidKeyInputFormat() {}
+
+		@Override
+		public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
+			return null;
+		}
+	}
+
+	private class DummyRecordReader implements RecordReader<String, Long> {
+
+		@Override
+		public float getProgress() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public boolean next(String s, Long aLong) throws IOException {
+			return false;
+		}
+
+		@Override
+		public String createKey() {
+			return null;
+		}
+
+		@Override
+		public Long createValue() {
+			return null;
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public void close() throws IOException {
+
+		}
+	}
+
+	private class ConfigurableDummyRecordReader implements RecordReader<String, Long>, Configurable {
+
+		@Override
+		public void setConf(Configuration configuration) {}
+
+		@Override
+		public Configuration getConf() {
+			return null;
+		}
+
+		@Override
+		public boolean next(String s, Long aLong) throws IOException {
+			return false;
+		}
+
+		@Override
+		public String createKey() {
+			return null;
+		}
+
+		@Override
+		public Long createValue() {
+			return null;
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public void close() throws IOException {
+
+		}
+
+		@Override
+		public float getProgress() throws IOException {
+			return 0;
+		}
+	}
+
+	private class DummyInputFormat implements InputFormat<String, Long> {
+
+		@Override
+		public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
+			return new InputSplit[0];
+		}
+
+		@Override
+		public RecordReader<String, Long> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
+			return null;
+		}
+	}
+
+	private class ConfigurableDummyInputFormat extends DummyInputFormat implements Configurable {
+		@Override
+		public void setConf(Configuration configuration) {}
+
+		@Override
+		public Configuration getConf() {
+			return null;
+		}
+	}
+
+	private class JobConfigurableDummyInputFormat extends DummyInputFormat implements JobConfigurable {
+
+		@Override
+		public void configure(JobConf jobConf) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
new file mode 100644
index 0000000..362a37f
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link HadoopOutputFormat}.
+ */
+public class HadoopOutputFormatTest {
+
+	@Test
+	public void testOpen() throws Exception {
+
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
+		JobConf jobConf = Mockito.spy(new JobConf());
+		when(jobConf.getOutputCommitter()).thenReturn(outputCommitter);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
+
+		outputFormat.open(1, 1);
+
+		verify(jobConf, times(2)).getOutputCommitter();
+		verify(outputCommitter, times(1)).setupJob(any(JobContext.class));
+		verify(dummyOutputFormat, times(1)).getRecordWriter(any(FileSystem.class), any(JobConf.class), anyString(), any(Progressable.class));
+	}
+
+	@Test
+	public void testConfigureWithConfigurable() {
+		ConfigurableDummyOutputFormat dummyOutputFormat = mock(ConfigurableDummyOutputFormat.class);
+		JobConf jobConf = mock(JobConf.class);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
+
+		outputFormat.configure(Matchers.<org.apache.flink.configuration.Configuration>any());
+
+		verify(dummyOutputFormat, times(1)).setConf(any(Configuration.class));
+	}
+
+	@Test
+	public void testConfigureWithJobConfigurable() {
+		JobConfigurableDummyOutputFormat dummyOutputFormat = mock(JobConfigurableDummyOutputFormat.class);
+		JobConf jobConf = mock(JobConf.class);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
+
+		outputFormat.configure(Matchers.<org.apache.flink.configuration.Configuration>any());
+
+		verify(dummyOutputFormat, times(1)).configure(any(JobConf.class));
+	}
+
+	@Test
+	public void testCloseWithTaskCommit() throws Exception {
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
+		when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(true);
+		DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
+		JobConf jobConf = mock(JobConf.class);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
+		outputFormat.recordWriter = recordWriter;
+		outputFormat.outputCommitter = outputCommitter;
+
+		outputFormat.close();
+
+		verify(recordWriter, times(1)).close(any(Reporter.class));
+		verify(outputCommitter, times(1)).commitTask(any(TaskAttemptContext.class));
+	}
+
+	@Test
+	public void testCloseWithoutTaskCommit() throws Exception {
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
+		when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(false);
+		DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
+		JobConf jobConf = mock(JobConf.class);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
+		outputFormat.recordWriter = recordWriter;
+		outputFormat.outputCommitter = outputCommitter;
+
+		outputFormat.close();
+
+		verify(recordWriter, times(1)).close(any(Reporter.class));
+		verify(outputCommitter, times(0)).commitTask(any(TaskAttemptContext.class));
+	}
+
+	@Test
+	public void testWriteRecord() throws Exception {
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
+		JobConf jobConf = mock(JobConf.class);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
+		outputFormat.recordWriter = recordWriter;
+
+		outputFormat.writeRecord(new Tuple2<>("key", 1L));
+
+		verify(recordWriter, times(1)).write(anyString(), anyLong());
+	}
+
+	@Test
+	public void testFinalizeGlobal() throws Exception {
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
+		JobConf jobConf = Mockito.spy(new JobConf());
+		when(jobConf.getOutputCommitter()).thenReturn(outputCommitter);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
+
+		outputFormat.finalizeGlobal(1);
+
+		verify(outputCommitter, times(1)).commitJob(any(JobContext.class));
+	}
+
+	private class DummyOutputFormat implements OutputFormat<String, Long> {
+
+		@Override
+		public RecordWriter<String, Long> getRecordWriter(FileSystem fileSystem, JobConf jobConf, String s, Progressable progressable) throws IOException {
+			return null;
+		}
+
+		@Override
+		public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOException {
+
+		}
+	}
+
+	private class ConfigurableDummyOutputFormat extends DummyOutputFormat implements Configurable {
+
+		@Override
+		public void setConf(Configuration configuration) {
+
+		}
+
+		@Override
+		public Configuration getConf() {
+			return null;
+		}
+	}
+
+	private class JobConfigurableDummyOutputFormat extends DummyOutputFormat implements JobConfigurable {
+
+		@Override
+		public void configure(JobConf jobConf) {
+
+		}
+	}
+
+	private class DummyOutputCommitter extends OutputCommitter {
+
+		@Override
+		public void setupJob(JobContext jobContext) throws IOException {
+
+		}
+
+		@Override
+		public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+		}
+
+		@Override
+		public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+			return false;
+		}
+
+		@Override
+		public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+		}
+
+		@Override
+		public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+		}
+	}
+
+	private class DummyRecordWriter implements RecordWriter<String, Long> {
+
+		@Override
+		public void write(String s, Long aLong) throws IOException {
+
+		}
+
+		@Override
+		public void close(Reporter reporter) throws IOException {
+
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
new file mode 100644
index 0000000..941eb1f
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link HadoopInputFormat}.
+ */
+public class HadoopInputFormatTest {
+
+	@Rule
+	public final ExpectedException exception = ExpectedException.none();
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		ConfigurableDummyInputFormat inputFormat = mock(ConfigurableDummyInputFormat.class);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
+		hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration());
+
+		verify(inputFormat, times(1)).setConf(any(Configuration.class));
+	}
+
+	@Test
+	public void testCreateInputSplits() throws Exception {
+		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
+		hadoopInputFormat.createInputSplits(2);
+
+		verify(inputFormat, times(1)).getSplits(any(JobContext.class));
+	}
+
+	@Test
+	public void testOpen() throws Exception {
+		DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+		when(inputFormat.createRecordReader(any(InputSplit.class), any(TaskAttemptContext.class))).thenReturn(new DummyRecordReader());
+		HadoopInputSplit inputSplit = mock(HadoopInputSplit.class);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
+		hadoopInputFormat.open(inputSplit);
+
+		verify(inputFormat, times(1)).createRecordReader(any(InputSplit.class), any(TaskAttemptContext.class));
+		assertThat(hadoopInputFormat.fetched, is(false));
+	}
+
+	@Test
+	public void testClose() throws Exception {
+
+		DummyRecordReader recordReader = mock(DummyRecordReader.class);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+		hadoopInputFormat.close();
+
+		verify(recordReader, times(1)).close();
+	}
+
+	@Test
+	public void testCloseWithoutOpen() throws Exception {
+		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, Job.getInstance());
+		hadoopInputFormat.close();
+	}
+
+	@Test
+	public void testFetchNextInitialState() throws Exception {
+		DummyRecordReader recordReader = new DummyRecordReader();
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+		hadoopInputFormat.fetchNext();
+
+		assertThat(hadoopInputFormat.fetched, is(true));
+		assertThat(hadoopInputFormat.hasNext, is(false));
+	}
+
+	@Test
+	public void testFetchNextRecordReaderHasNewValue() throws Exception {
+
+		DummyRecordReader recordReader = mock(DummyRecordReader.class);
+		when(recordReader.nextKeyValue()).thenReturn(true);
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+		hadoopInputFormat.fetchNext();
+
+		assertThat(hadoopInputFormat.fetched, is(true));
+		assertThat(hadoopInputFormat.hasNext, is(true));
+	}
+
+	@Test
+	public void testFetchNextRecordReaderThrowsException() throws Exception {
+
+		DummyRecordReader recordReader = mock(DummyRecordReader.class);
+		when(recordReader.nextKeyValue()).thenThrow(new InterruptedException());
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+
+		exception.expect(IOException.class);
+		hadoopInputFormat.fetchNext();
+
+		assertThat(hadoopInputFormat.hasNext, is(true));
+	}
+
+	@Test
+	public void checkTypeInformation() throws Exception {
+
+		HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<>(
+				new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, Job.getInstance());
+
+		TypeInformation<Tuple2<Void, Long>> tupleType = hadoopInputFormat.getProducedType();
+		TypeInformation<Tuple2<Void, Long>> expectedType = new TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+
+		assertThat(tupleType.isTupleType(), is(true));
+		assertThat(tupleType, is(equalTo(expectedType)));
+	}
+
+	private HadoopInputFormat<String, Long> setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job,
+																	RecordReader<String, Long> recordReader) {
+
+		HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat,
+				String.class, Long.class, job);
+		hadoopInputFormat.recordReader = recordReader;
+
+		return hadoopInputFormat;
+	}
+
+	private class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
+
+		public DummyVoidKeyInputFormat() {}
+
+		@Override
+		public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+			return null;
+		}
+	}
+
+	private class DummyRecordReader extends RecordReader<String, Long> {
+
+		@Override
+		public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+
+		}
+
+		@Override
+		public boolean nextKeyValue() throws IOException, InterruptedException {
+			return false;
+		}
+
+		@Override
+		public String getCurrentKey() throws IOException, InterruptedException {
+			return null;
+		}
+
+		@Override
+		public Long getCurrentValue() throws IOException, InterruptedException {
+			return null;
+		}
+
+		@Override
+		public float getProgress() throws IOException, InterruptedException {
+			return 0;
+		}
+
+		@Override
+		public void close() throws IOException {
+
+		}
+	}
+
+	private class DummyInputFormat extends InputFormat<String, Long> {
+
+		@Override
+		public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
+			return null;
+		}
+
+		@Override
+		public RecordReader<String, Long> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+			return new DummyRecordReader();
+		}
+	}
+
+	private class ConfigurableDummyInputFormat extends DummyInputFormat implements Configurable {
+
+		@Override
+		public void setConf(Configuration configuration) {}
+
+		@Override
+		public Configuration getConf() {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
new file mode 100644
index 0000000..8e2329a
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link HadoopOutputFormat}.
+ */
+public class HadoopOutputFormatTest {
+
+	private static final String MAPRED_OUTPUT_PATH = "an/ignored/file/";
+	private static final String MAPRED_OUTPUT_DIR_KEY = "mapred.output.dir";
+
+	@Test
+	public void testWriteRecord() throws Exception {
+
+		RecordWriter<String, Long> recordWriter = mock(DummyRecordWriter.class);
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
+			Job.getInstance(), recordWriter, null, new Configuration());
+
+		hadoopOutputFormat.writeRecord(new Tuple2<String, Long>());
+
+		verify(recordWriter, times(1)).write(anyString(), anyLong());
+	}
+
+	@Test
+	public void testOpen() throws Exception {
+
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		OutputCommitter outputCommitter = setupOutputCommitter(true);
+		when(dummyOutputFormat.getOutputCommitter(any(TaskAttemptContext.class))).thenReturn(outputCommitter);
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(dummyOutputFormat,
+			Job.getInstance(), new DummyRecordWriter(), setupOutputCommitter(true), new Configuration());
+
+		hadoopOutputFormat.open(1, 4);
+
+		verify(hadoopOutputFormat.outputCommitter, times(1)).setupJob(any(JobContext.class));
+		verify(hadoopOutputFormat.mapreduceOutputFormat, times(1)).getRecordWriter(any(TaskAttemptContext.class));
+	}
+
+	@Test
+	public void testCloseWithNeedsTaskCommitTrue() throws Exception {
+
+		RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
+		OutputCommitter outputCommitter = setupOutputCommitter(true);
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
+			Job.getInstance(), recordWriter, outputCommitter, new Configuration());
+
+		hadoopOutputFormat.close();
+
+		verify(outputCommitter, times(1)).commitTask(any(TaskAttemptContext.class));
+		verify(recordWriter, times(1)).close(any(TaskAttemptContext.class));
+	}
+
+	@Test
+	public void testCloseWithNeedsTaskCommitFalse() throws Exception {
+
+		RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
+		OutputCommitter outputCommitter = setupOutputCommitter(false);
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
+			Job.getInstance(), recordWriter, outputCommitter, new Configuration());
+
+		hadoopOutputFormat.close();
+
+		verify(outputCommitter, times(0)).commitTask(any(TaskAttemptContext.class));
+		verify(recordWriter, times(1)).close(any(TaskAttemptContext.class));
+	}
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		ConfigurableDummyOutputFormat outputFormat = mock(ConfigurableDummyOutputFormat.class);
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(outputFormat, Job.getInstance(),
+			null, null, new Configuration());
+
+		hadoopOutputFormat.configure(new org.apache.flink.configuration.Configuration());
+
+		verify(outputFormat, times(1)).setConf(any(Configuration.class));
+	}
+
+	@Test
+	public void testFinalizedGlobal() throws Exception {
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
+			Job.getInstance(), null, null, new Configuration());
+
+		hadoopOutputFormat.finalizeGlobal(1);
+
+		verify(hadoopOutputFormat.outputCommitter, times(1)).commitJob(any(JobContext.class));
+	}
+
+	private OutputCommitter setupOutputCommitter(boolean needsTaskCommit) throws IOException {
+		OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
+		when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(needsTaskCommit);
+		doNothing().when(outputCommitter).commitTask(any(TaskAttemptContext.class));
+
+		return outputCommitter;
+	}
+
+	private HadoopOutputFormat<String, Long> setupHadoopOutputFormat(
+		OutputFormat<String, Long> outputFormat,
+		Job job,
+		RecordWriter<String, Long> recordWriter,
+		OutputCommitter outputCommitter,
+		Configuration configuration) {
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(outputFormat, job);
+		hadoopOutputFormat.recordWriter = recordWriter;
+		hadoopOutputFormat.outputCommitter = outputCommitter;
+		hadoopOutputFormat.configuration = configuration;
+		hadoopOutputFormat.configuration.set(MAPRED_OUTPUT_DIR_KEY, MAPRED_OUTPUT_PATH);
+
+		return hadoopOutputFormat;
+	}
+
+	class DummyRecordWriter extends RecordWriter<String, Long> {
+		@Override
+		public void write(String key, Long value) throws IOException, InterruptedException {
+		}
+
+		@Override
+		public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+
+		}
+	}
+
+	class DummyOutputFormat extends OutputFormat<String, Long> {
+		@Override
+		public RecordWriter<String, Long> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+			return null;
+		}
+
+		@Override
+		public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+
+		}
+
+		@Override
+		public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+			final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
+			doNothing().when(outputCommitter).setupJob(any(JobContext.class));
+
+			return outputCommitter;
+		}
+	}
+
+	class ConfigurableDummyOutputFormat extends DummyOutputFormat implements Configurable {
+
+		@Override
+		public void setConf(Configuration configuration) {}
+
+		@Override
+		public Configuration getConf() {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
new file mode 100644
index 0000000..bbe6395
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * Integraiton tests for Hadoop IO formats.
+ */
+@RunWith(Parameterized.class)
+public class HadoopIOFormatsITCase extends JavaProgramTestBase {
+
+	private static final int NUM_PROGRAMS = 2;
+
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String[] resultPath;
+	private String[] expectedResult;
+	private String sequenceFileInPath;
+	private String sequenceFileInPathNull;
+
+	public HadoopIOFormatsITCase(Configuration config) {
+		super(config);
+	}
+
+	@Before
+	public void checkOperatingSystem() {
+		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") };
+
+		File sequenceFile = createAndRegisterTempFile("seqFile");
+		sequenceFileInPath = sequenceFile.toURI().toString();
+
+		// Create a sequence file
+		org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+		FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
+		Path path = new Path(sequenceFile.getAbsolutePath());
+
+		//  ------------------ Long / Text Key Value pair: ------------
+		int kvCount = 4;
+
+		LongWritable key = new LongWritable();
+		Text value = new Text();
+		SequenceFile.Writer writer = null;
+		try {
+			writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
+			for (int i = 0; i < kvCount; i++) {
+				if (i == 1) {
+					// write key = 0 a bit more often.
+					for (int a = 0; a < 15; a++) {
+						key.set(i);
+						value.set(i + " - somestring");
+						writer.append(key, value);
+					}
+				}
+				key.set(i);
+				value.set(i + " - somestring");
+				writer.append(key, value);
+			}
+		} finally {
+			IOUtils.closeStream(writer);
+		}
+
+		//  ------------------ Long / Text Key Value pair: ------------
+
+		File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
+		sequenceFileInPathNull = sequenceFileNull.toURI().toString();
+		path = new Path(sequenceFileInPathNull);
+
+		LongWritable value1 = new LongWritable();
+		SequenceFile.Writer writer1 = null;
+		try {
+			writer1 = SequenceFile.createWriter(fs, conf, path, NullWritable.class, value1.getClass());
+			for (int i = 0; i < kvCount; i++) {
+				value1.set(i);
+				writer1.append(NullWritable.get(), value1);
+			}
+		} finally {
+			IOUtils.closeStream(writer1);
+		}
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (int i = 0; i < resultPath.length; i++) {
+			compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
+		}
+	}
+
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for (int i = 1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+
+		return TestBaseUtils.toParameterList(tConfigs);
+	}
+
+	private static class HadoopIOFormatPrograms {
+
+		public static String[] runProgram(int progId, String[] resultPath, String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
+
+			switch(progId) {
+			case 1: {
+				/**
+				 * Test sequence file, including a key access.
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+				SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>();
+				JobConf hdconf = new JobConf();
+				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath));
+				HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf);
+				DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif);
+				DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
+					@Override
+					public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
+						return new Tuple2<Long, Text>(value.f0.get(), value.f1);
+					}
+				}).sum(0);
+				sumed.writeAsText(resultPath[0]);
+				DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
+					@Override
+					public String map(Tuple2<LongWritable, Text> value) throws Exception {
+						return value.f1 + " - " + value.f0.get();
+					}
+				});
+				res.writeAsText(resultPath[1]);
+				env.execute();
+
+				// return expected result
+				return 	new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" +
+						"1 - somestring - 1\n" +
+						"2 - somestring - 2\n" +
+						"3 - somestring - 3\n"};
+
+			}
+			case 2: {
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+				SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
+				JobConf hdconf = new JobConf();
+				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull));
+				HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf);
+				DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif);
+				DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
+					@Override
+					public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception {
+						return new Tuple2<Void, Long>(null, value.f1.get());
+					}
+				});
+				DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1);
+				res1.writeAsText(resultPath[1]);
+				res.writeAsText(resultPath[0]);
+				env.execute();
+
+				// return expected result
+				return 	new String [] {"(null,2)\n" +
+						"(null,0)\n" +
+						"(null,1)\n" +
+						"(null,3)",
+						"(null,0)\n" +
+						"(null,1)\n" +
+						"(null,2)\n" +
+						"(null,3)"};
+			}
+			default:
+				throw new IllegalArgumentException("Invalid program id");
+			}
+
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java
new file mode 100644
index 0000000..53927e7
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.HadoopInputs;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.junit.Assume;
+import org.junit.Before;
+
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
+
+/**
+ * Test WordCount with Hadoop input and output "mapred" (legacy) formats.
+ */
+public class WordCountMapredITCase extends JavaProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Before
+	public void checkOperatingSystem() {
+		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[] {".", "_"});
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		internalRun(true);
+		postSubmit();
+		resultPath = getTempDirPath("result2");
+		internalRun(false);
+		postSubmit();
+	}
+
+	private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<LongWritable, Text>> input;
+
+		if (isTestDeprecatedAPI) {
+			input = env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
+				LongWritable.class, Text.class, textPath));
+		} else {
+			input = env.createInput(readHadoopFile(new TextInputFormat(),
+				LongWritable.class, Text.class, textPath));
+		}
+
+		DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
+			@Override
+			public String map(Tuple2<LongWritable, Text> value) throws Exception {
+				return value.f1.toString();
+			}
+		});
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.groupBy(0)
+						.sum(1);
+
+		DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
+
+			@Override
+			public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception {
+				return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1));
+			}
+		});
+
+		// Set up Hadoop Output Format
+		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
+				new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
+		hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
+		TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(resultPath));
+
+		// Output & Execute
+		words.output(hadoopOutputFormat);
+		env.execute("Hadoop Compat WordCount");
+	}
+
+	static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			String[] tokens = value.toLowerCase().split("\\W+");
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<>(token, 1));
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java
new file mode 100644
index 0000000..be70782
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapreduce;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.HadoopInputs;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Assume;
+import org.junit.Before;
+
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
+
+/**
+ * Test WordCount with Hadoop input and output "mapreduce" (modern) formats.
+ */
+public class WordCountMapreduceITCase extends JavaProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Before
+	public void checkOperatingSystem() {
+		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[] {".", "_"});
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		internalRun(true);
+		postSubmit();
+		resultPath = getTempDirPath("result2");
+		internalRun(false);
+		postSubmit();
+	}
+
+	private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<LongWritable, Text>> input;
+		if (isTestDeprecatedAPI) {
+			input = env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
+				LongWritable.class, Text.class, textPath));
+		} else {
+			input = env.createInput(readHadoopFile(new TextInputFormat(),
+				LongWritable.class, Text.class, textPath));
+		}
+
+		DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
+			@Override
+			public String map(Tuple2<LongWritable, Text> value) throws Exception {
+				return value.f1.toString();
+			}
+		});
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.groupBy(0)
+						.sum(1);
+
+		DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
+
+			@Override
+			public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception {
+				return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1));
+			}
+		});
+
+		// Set up Hadoop Output Format
+		Job job = Job.getInstance();
+		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
+				new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), job);
+		job.getConfiguration().set("mapred.textoutputformat.separator", " ");
+		TextOutputFormat.setOutputPath(job, new Path(resultPath));
+
+		// Output & Execute
+		words.output(hadoopOutputFormat);
+		env.execute("Hadoop Compat WordCount");
+	}
+
+	static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			String[] tokens = value.toLowerCase().split("\\W+");
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<>(token, 1));
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala
new file mode 100644
index 0000000..5aaf379
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.hadoopcompatibility.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
+import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
+import org.apache.flink.util.OperatingSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextInputFormat, TextOutputFormat}
+import org.junit.{Assume, Before}
+
+class WordCountMapredITCase extends JavaProgramTestBase {
+  protected var textPath: String = null
+  protected var resultPath: String = null
+
+  @Before
+  def checkOperatingSystem() {
+    // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+    Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows)
+  }
+
+  protected override def preSubmit() {
+    textPath = createTempFile("text.txt", WordCountData.TEXT)
+    resultPath = getTempDirPath("result")
+  }
+
+  protected override def postSubmit() {
+    TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS,
+                                                resultPath, Array[String](".", "_"))
+  }
+
+  private def internalRun (testDeprecatedAPI: Boolean): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input =
+      if (testDeprecatedAPI) {
+        env.createInput(
+          HadoopInputs.readHadoopFile(
+            new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
+      } else {
+        env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
+          classOf[Text], textPath))
+      }
+
+    val counts = input
+      .map(_._2.toString)
+      .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
+      .groupBy(0)
+      .sum(1)
+
+    val words = counts
+      .map( t => (new Text(t._1), new LongWritable(t._2)) )
+
+    val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
+      new TextOutputFormat[Text, LongWritable],
+      new JobConf)
+    hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ")
+
+    FileOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf, new Path(resultPath))
+
+    words.output(hadoopOutputFormat)
+
+    env.execute("Hadoop Compat WordCount")
+  }
+
+  protected def testProgram() {
+    internalRun(testDeprecatedAPI = true)
+    postSubmit()
+    resultPath = getTempDirPath("result2")
+    internalRun(testDeprecatedAPI = false)
+    postSubmit()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala
new file mode 100644
index 0000000..61e7a12
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.hadoopcompatibility.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
+import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
+import org.apache.flink.util.OperatingSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+import org.junit.{Assume, Before}
+
+class WordCountMapreduceITCase extends JavaProgramTestBase {
+  protected var textPath: String = null
+  protected var resultPath: String = null
+
+  @Before
+  def checkOperatingSystem() {
+    // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+    Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows)
+  }
+
+  protected override def preSubmit() {
+    textPath = createTempFile("text.txt", WordCountData.TEXT)
+    resultPath = getTempDirPath("result")
+  }
+
+  protected override def postSubmit() {
+    TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS,
+                                                resultPath, Array[String](".", "_"))
+  }
+
+  protected def testProgram() {
+    internalRun(testDeprecatedAPI = true)
+    postSubmit()
+    resultPath = getTempDirPath("result2")
+    internalRun(testDeprecatedAPI = false)
+    postSubmit()
+  }
+
+  private def internalRun (testDeprecatedAPI: Boolean): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input =
+      if (testDeprecatedAPI) {
+        env.createInput(
+          HadoopInputs.readHadoopFile(
+            new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
+      } else {
+        env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
+          classOf[Text], textPath))
+      }
+
+    val counts = input
+      .map(_._2.toString)
+      .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
+      .groupBy(0)
+      .sum(1)
+
+    val words = counts
+      .map( t => (new Text(t._1), new LongWritable(t._2)) )
+
+    val job = Job.getInstance()
+    val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
+      new TextOutputFormat[Text, LongWritable],
+      job)
+    hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " ")
+
+    FileOutputFormat.setOutputPath(job, new Path(resultPath))
+
+    words.output(hadoopOutputFormat)
+
+    env.execute("Hadoop Compat WordCount")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 089c90b..02acfc6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -33,7 +33,6 @@ import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.io.CsvReader;
 import org.apache.flink.api.java.io.IteratorInputFormat;
@@ -61,8 +60,6 @@ import org.apache.flink.util.SplittableIterator;
 import org.apache.flink.util.Visitor;
 
 import com.esotericsoftware.kryo.Serializer;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -582,109 +579,6 @@ public abstract class ExecutionEnvironment {
 		return new DataSource<>(this, inputFormat, producedType, Utils.getCallLocationName());
 	}
 
-	// ----------------------------------- Hadoop Input Format ---------------------------------------
-
-	/**
-	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}.
-	 *
-	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String, JobConf)}
-	 * from the flink-hadoop-compatibility module.
-	 */
-	@Deprecated
-	@PublicEvolving
-	public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
-		DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
-
-		org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
-
-		return result;
-	}
-
-	/**
-	 * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat}
-	 * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
-	 *
-	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class<K>, Class<V>, String)}
-	 * from the flink-hadoop-compatibility module.
-	 */
-	@Deprecated
-	@PublicEvolving
-	public <K, V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
-		return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
-	}
-
-	/**
-	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A
-	 * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
-	 *
-	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
-	 * from the flink-hadoop-compatibility module.
-	 */
-	@Deprecated
-	@PublicEvolving
-	public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
-		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
-	}
-
-	/**
-	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}.
-	 *
-	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V>, Class<K>, Class<V>, JobConf)}
-	 * from the flink-hadoop-compatibility module.
-	 */
-	@Deprecated
-	@PublicEvolving
-	public <K, V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
-		HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job);
-
-		return this.createInput(hadoopInputFormat);
-	}
-
-	/**
-	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
-	 * given inputName is set on the given job.
-	 *
-	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String, Job)}
-	 * from the flink-hadoop-compatibility module.
-	 */
-	@Deprecated
-	@PublicEvolving
-	public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
-		DataSource<Tuple2<K, V>> result = createHadoopInput(mapreduceInputFormat, key, value, job);
-
-		org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
-				.hadoop.fs.Path(inputPath));
-
-		return result;
-	}
-
-	/**
-	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
-	 * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
-	 *
-	 * @deprecated Please use {@code  org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
-	 * from the flink-hadoop-compatibility module.
-	 */
-	@Deprecated
-	@PublicEvolving
-	public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
-		return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
-	}
-
-	/**
-	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
-	 *
-	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V>, Class<K>, Class<V>, Job)}
-	 * from the flink-hadoop-compatibility module.
-	 */
-	@Deprecated
-	@PublicEvolving
-	public <K, V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
-		org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
-
-		return this.createInput(hadoopInputFormat);
-	}
-
 	// ----------------------------------- Collection ---------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java
deleted file mode 100644
index bee331c..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.common;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.core.io.InputSplit;
-
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-/**
- * A common base for both "mapred" and "mapreduce" Hadoop input formats.
- */
-@Internal
-public abstract class HadoopInputFormatCommonBase<T, SPITTYPE extends InputSplit> extends RichInputFormat<T, SPITTYPE> {
-	protected transient Credentials credentials;
-
-	protected HadoopInputFormatCommonBase(Credentials creds) {
-		this.credentials = creds;
-	}
-
-	protected void write(ObjectOutputStream out) throws IOException {
-		this.credentials.write(out);
-	}
-
-	public void read(ObjectInputStream in) throws IOException {
-		this.credentials = new Credentials();
-		credentials.readFields(in);
-	}
-
-	/**
-	 * This method only exists because there is no UserGroupInformation.getCredentials() method
-	 * in Hadoop 1.x
-	 *
-	 * <p>Note that this method returns "null" in Hadoop 1.x environments.
-	 *
-	 * @param ugi The user information
-	 * @return new credentials object from the user information. MAY RETURN NULL!
-	 */
-	public static Credentials getCredentialsFromUGI(UserGroupInformation ugi) {
-		Method getCredentialsMethod = null;
-		for (Method m : ugi.getClass().getMethods()) {
-			if (m.getName().equals("getCredentials")) {
-				getCredentialsMethod = m;
-				break;
-			}
-		}
-		if (getCredentialsMethod == null) {
-			return null;
-		} else {
-			try {
-				return (Credentials) getCredentialsMethod.invoke(ugi);
-			} catch (InvocationTargetException | IllegalAccessException e) {
-				throw new RuntimeException("Unable to get credentials from UserGroupInformation. This is only supported by Hadoop 2.2.0+");
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4beff13e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java
deleted file mode 100644
index 8f45612..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.hadoop.common;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.io.RichOutputFormat;
-
-import org.apache.hadoop.security.Credentials;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-/**
- * A common base for both "mapred" and "mapreduce" Hadoop output formats.
- *
- * <p>The base is taking care of handling (serializing) security credentials.
- */
-@Internal
-public abstract class HadoopOutputFormatCommonBase<T> extends RichOutputFormat<T> {
-	protected transient Credentials credentials;
-
-	protected HadoopOutputFormatCommonBase(Credentials creds) {
-		this.credentials = creds;
-	}
-
-	protected void write(ObjectOutputStream out) throws IOException {
-		this.credentials.write(out);
-	}
-
-	public void read(ObjectInputStream in) throws IOException {
-		this.credentials = new Credentials();
-		credentials.readFields(in);
-	}
-}