You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/11/02 20:46:43 UTC

[5/5] flink git commit: [FLINK-4315] [dataSet] [hadoopCompat] Annotate Hadoop-related methods in ExecutionEnvironment as @Deprecated.

[FLINK-4315] [dataSet] [hadoopCompat] Annotate Hadoop-related methods in ExecutionEnvironment as @Deprecated.

- Preparation to remove Hadoop dependency from flink-java
- Alternatives for deprecated functionality is provided in flink-hadoop-compatibility via HadoopInputs

This closes #2637.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d61e1f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d61e1f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d61e1f2

Branch: refs/heads/master
Commit: 7d61e1f2fd0c9b0e3719b2d7252a164cfdf941c4
Parents: 4565170
Author: Evgeny_Kincharov <Ev...@epam.com>
Authored: Fri Oct 14 17:19:22 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:31:25 2016 +0100

----------------------------------------------------------------------
 .../flink-hadoop-compatibility/pom.xml          |  98 +++++++++++++
 .../flink/hadoopcompatibility/HadoopInputs.java | 118 +++++++++++++++
 .../flink/hadoopcompatibility/HadoopUtils.java  |  52 +++++++
 .../scala/HadoopInputs.scala                    | 143 +++++++++++++++++++
 .../hadoopcompatibility/HadoopUtilsTest.java    |  34 +++++
 flink-java/pom.xml                              |   2 +-
 .../flink/api/java/ExecutionEnvironment.java    |  33 ++++-
 .../flink/api/java/utils/ParameterTool.java     |   3 +
 .../java/utils/AbstractParameterToolTest.java   |  71 +++++++++
 .../flink/api/java/utils/ParameterToolTest.java |  46 +-----
 .../flink/api/scala/ExecutionEnvironment.scala  |  37 ++++-
 .../hadoop/mapred/WordCountMapredITCase.java    |  16 ++-
 .../mapreduce/WordCountMapreduceITCase.java     |  17 ++-
 .../hadoop/mapred/WordCountMapredITCase.scala   |  35 +++--
 .../mapreduce/WordCountMapreduceITCase.scala    |  26 +++-
 15 files changed, 661 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
index aa818f6..8143a03 100644
--- a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
+++ b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
@@ -47,6 +47,21 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>${shading-artifact.name}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
@@ -78,6 +93,89 @@ under the License.
 				<groupId>com.github.siom79.japicmp</groupId>
 				<artifactId>japicmp-maven-plugin</artifactId>
 			</plugin>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Scala Code Style, most of the configuration done via plugin management -->
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<configuration>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
new file mode 100644
index 0000000..9e8a3e4
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+/**
+ * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
+ *
+ * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
+ * and {@link org.apache.hadoop.mapreduce.InputFormat}.
+ *
+ * Key value pairs produced by the Hadoop InputFormats are converted into Flink
+ * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
+ *
+ */
+
+public final class HadoopInputs {
+	// ----------------------------------- Hadoop Input Format ---------------------------------------
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+		// set input path in JobConf
+		org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+		// return wrapping InputFormat
+		return createHadoopInput(mapredInputFormat, key, value, job);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} to read a Hadoop sequence file for the given key and value classes.
+	 *
+	 * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
+		return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.InputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+		return new HadoopInputFormat<>(mapredInputFormat, key, value, job);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException
+	{
+		// set input path in Job
+		org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+		// return wrapping InputFormat
+		return createHadoopInput(mapreduceInputFormat, key, value, job);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException
+	{
+		return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.InputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+	 */
+	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
+			org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
+	{
+		return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
new file mode 100644
index 0000000..97ca329
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.commons.cli.Option;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to work with Apache Hadoop libraries.
+ */
+public class HadoopUtils {
+	/**
+	 * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}
+	 *
+	 * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser}
+	 * @return A {@link ParameterTool}
+	 * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser}
+	 * @see GenericOptionsParser
+	 */
+	public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException {
+		Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions();
+		Map<String, String> map = new HashMap<String, String>();
+		for (Option option : options) {
+			String[] split = option.getValue().split("=");
+			map.put(split[0], split[1]);
+		}
+		return ParameterTool.fromMap(map);
+	}
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
new file mode 100644
index 0000000..133a5f4
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hadoopcompatibility.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.hadoop.mapreduce
+import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.hadoop.fs.{Path => HadoopPath}
+import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
+import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat}
+
+/**
+  * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
+  *
+  * It provides methods to create Flink InputFormat wrappers for Hadoop
+  * [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]].
+  *
+  * Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where
+  * the first field is the key and the second field is the value.
+  *
+  */
+object HadoopInputs {
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    // set input path in JobConf
+    MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    // wrap mapredInputFormat
+    createHadoopInput(mapredInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence
+    * file with the given key and value classes.
+    */
+  def readSequenceFile[K, V](
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    readHadoopFile(
+      new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
+      key,
+      value,
+      inputPath
+    )
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.InputFormat]].
+    */
+  def createHadoopInput[K, V](
+      mapredInputFormat: MapredInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
+
+    // set input path in Job
+    MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    // wrap mapreduceInputFormat
+    createHadoopInput(mapreduceInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] =
+  {
+    readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.InputFormat]].
+    */
+  def createHadoopInput[K, V](
+      mapreduceInputFormat: MapreduceInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
+
+    new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
new file mode 100644
index 0000000..6f7673b
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.java.utils.AbstractParameterToolTest;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class HadoopUtilsTest extends AbstractParameterToolTest {
+
+	@Test
+	public void testParamsFromGenericOptionsParser() throws IOException {
+		ParameterTool parameter = HadoopUtils.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"});
+		validate(parameter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 6924da8..5bc81c6 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -84,7 +84,7 @@ under the License.
 				<artifactId>japicmp-maven-plugin</artifactId>
 			</plugin>
 
-			<!-- Because flink-scala and flink-avro uses it in tests -->
+			<!-- Because flink-scala, flink-avro and flink-hadoop-compatibility uses it in tests -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-jar-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index add8531..964eed1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -585,9 +585,12 @@ public abstract class ExecutionEnvironment {
 	// ----------------------------------- Hadoop Input Format ---------------------------------------
 
 	/**
-	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. The
-	 * given inputName is set on the given job.
+	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}.
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String, JobConf)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
 		DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
@@ -600,7 +603,11 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat}
 	 * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
- 	 */
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class<K>, Class<V>, String)}
+	 * from the flink-hadoop-compatibility module.
+	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
 		return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
@@ -609,7 +616,11 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A
 	 * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
 		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
@@ -617,7 +628,11 @@ public abstract class ExecutionEnvironment {
 
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}.
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V>, Class<K>, Class<V>, JobConf)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
 		HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job);
@@ -628,7 +643,11 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
 	 * given inputName is set on the given job.
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String, Job)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
 		DataSource<Tuple2<K, V>> result = createHadoopInput(mapreduceInputFormat, key, value, job);
@@ -642,7 +661,11 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
 	 * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+	 *
+	 * @deprecated Please use {@link  org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
 		return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
@@ -650,7 +673,11 @@ public abstract class ExecutionEnvironment {
 
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V>, Class<K>, Class<V>, Job)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
 		org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 8f504e4..8e15441 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -191,7 +191,10 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	 * @return A {@link ParameterTool}
 	 * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser}
 	 * @see GenericOptionsParser
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopUtils#paramsFromGenericOptionsParser(String[])}
+	 * from project flink-hadoop-compatibility
 	 */
+	@Deprecated
 	@PublicEvolving
 	public static ParameterTool fromGenericOptionsParser(String[] args) throws IOException {
 		Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions();

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
new file mode 100644
index 0000000..9aa9a95
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+public abstract class AbstractParameterToolTest {
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	protected void validate(ParameterTool parameter) {
+		ClosureCleaner.ensureSerializable(parameter);
+		Assert.assertEquals("myInput", parameter.getRequired("input"));
+		Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue"));
+		Assert.assertEquals(null, parameter.get("whatever"));
+		Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L));
+		Assert.assertTrue(parameter.getBoolean("thisIsUseful", true));
+		Assert.assertEquals(42, parameter.getByte("myDefaultByte", (byte) 42));
+		Assert.assertEquals(42, parameter.getShort("myDefaultShort", (short) 42));
+
+		Configuration config = parameter.getConfiguration();
+		Assert.assertEquals(15L, config.getLong("expectedCount", -1L));
+
+		Properties props = parameter.getProperties();
+		Assert.assertEquals("myInput", props.getProperty("input"));
+		props = null;
+
+		// -------- test the default file creation ------------
+		try {
+			String pathToFile = tmp.newFile().getAbsolutePath();
+			parameter.createPropertiesFile(pathToFile);
+			Properties defaultProps = new Properties();
+			try (FileInputStream fis = new FileInputStream(pathToFile)) {
+				defaultProps.load(fis);
+			}
+
+			Assert.assertEquals("myDefaultValue", defaultProps.get("output"));
+			Assert.assertEquals("-1", defaultProps.get("expectedCount"));
+			Assert.assertTrue(defaultProps.containsKey("input"));
+
+		} catch (IOException e) {
+			Assert.fail(e.getMessage());
+			e.printStackTrace();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
index 605f033..9b63985 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
@@ -18,25 +18,17 @@
 
 package org.apache.flink.api.java.utils;
 
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Map;
 import java.util.Properties;
 
-public class ParameterToolTest {
-
-	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
+public class ParameterToolTest extends AbstractParameterToolTest {
 
 	// ----- Parser tests -----------------
 
@@ -162,40 +154,4 @@ public class ParameterToolTest {
 		ParameterTool parameter = ParameterTool.fromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"});
 		validate(parameter);
 	}
-
-	private void validate(ParameterTool parameter) {
-		ClosureCleaner.ensureSerializable(parameter);
-		Assert.assertEquals("myInput", parameter.getRequired("input"));
-		Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue"));
-		Assert.assertEquals(null, parameter.get("whatever"));
-		Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L));
-		Assert.assertTrue(parameter.getBoolean("thisIsUseful", true));
-		Assert.assertEquals(42, parameter.getByte("myDefaultByte", (byte) 42));
-		Assert.assertEquals(42, parameter.getShort("myDefaultShort", (short) 42));
-
-		Configuration config = parameter.getConfiguration();
-		Assert.assertEquals(15L, config.getLong("expectedCount", -1L));
-
-		Properties props = parameter.getProperties();
-		Assert.assertEquals("myInput", props.getProperty("input"));
-		props = null;
-
-		// -------- test the default file creation ------------
-		try {
-			String pathToFile = tmp.newFile().getAbsolutePath();
-			parameter.createPropertiesFile(pathToFile);
-			Properties defaultProps = new Properties();
-			try (FileInputStream fis = new FileInputStream(pathToFile)) {
-				defaultProps.load(fis);
-			}
-
-			Assert.assertEquals("myDefaultValue", defaultProps.get("output"));
-			Assert.assertEquals("-1", defaultProps.get("expectedCount"));
-			Assert.assertTrue(defaultProps.containsKey("input"));
-
-		} catch (IOException e) {
-			Assert.fail(e.getMessage());
-			e.printStackTrace();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 4f9d569..18aab07 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -43,7 +43,7 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 /**
- * The ExecutionEnviroment is the context in which a program is executed. A local environment will
+ * The ExecutionEnvironment is the context in which a program is executed. A local environment will
  * cause execution in the current JVM, a remote environment will cause execution on a remote
  * cluster installation.
  *
@@ -412,7 +412,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The
    * given inputName is set on the given job.
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def readHadoopFile[K, V](
       mapredInputFormat: MapredFileInputFormat[K, V],
@@ -429,7 +434,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A
    * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def readHadoopFile[K, V](
       mapredInputFormat: MapredFileInputFormat[K, V],
@@ -443,7 +453,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a [[DataSet]] from [[org.apache.hadoop.mapred.SequenceFileInputFormat]]
    * A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readSequenceFile]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def readSequenceFile[K, V](
       key: Class[K],
@@ -456,7 +471,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]].
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def createHadoopInput[K, V](
       mapredInputFormat: MapredInputFormat[K, V],
@@ -471,7 +491,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
    * The given inputName is set on the given job.
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def readHadoopFile[K, V](
       mapreduceInputFormat: MapreduceFileInputFormat[K, V],
@@ -489,7 +514,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Creates a [[DataSet]] from the given
    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A
    * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created.
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def readHadoopFile[K, V](
       mapreduceInputFormat: MapreduceFileInputFormat[K, V],
@@ -502,7 +532,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]].
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def createHadoopInput[K, V](
       mapreduceInputFormat: MapreduceInputFormat[K, V],

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
index a0e3468..80f311a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
@@ -52,11 +53,24 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
+		internalRun(true);
+		postSubmit();
+		resultPath = getTempDirPath("result2");
+		internalRun(false);
+	}
+
+	private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
+		DataSet<Tuple2<LongWritable, Text>> input;
 
-		DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(),
+		if (isTestDeprecatedAPI) {
+			input = env.readHadoopFile(new TextInputFormat(),
 				LongWritable.class, Text.class, textPath);
+		} else {
+			input = env.createInput(readHadoopFile(new TextInputFormat(),
+				LongWritable.class, Text.class, textPath));
+		}
 
 		DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
index fee49bf..3293770 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
@@ -52,11 +53,23 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		internalRun(true);
+		postSubmit();
+		resultPath = getTempDirPath("result2");
+		internalRun(false);
+	}
 
+	private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(),
+		DataSet<Tuple2<LongWritable, Text>> input;
+		if (isTestDeprecatedAPI) {
+			input = env.readHadoopFile(new TextInputFormat(),
 				LongWritable.class, Text.class, textPath);
+		} else {
+			input = env.createInput(readHadoopFile(new TextInputFormat(),
+				LongWritable.class, Text.class, textPath));
+		}
 
 		DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
index b09ecc4..6b414d6 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
@@ -18,12 +18,12 @@
 package org.apache.flink.api.scala.hadoop.mapred
 
 import org.apache.flink.api.scala._
-
+import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
 import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
+import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{Text, LongWritable}
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextInputFormat, TextOutputFormat}
 
 class WordCountMapredITCase extends JavaProgramTestBase {
   protected var textPath: String = null
@@ -39,21 +39,27 @@ class WordCountMapredITCase extends JavaProgramTestBase {
                                                 resultPath, Array[String](".", "_"))
   }
 
-  protected def testProgram() {
+  private def internalRun (testDeprecatedAPI: Boolean): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val input =
-      env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+      if (testDeprecatedAPI) {
+        env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+      } else {
+        env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
+          classOf[Text], textPath))
+      }
 
-    val text = input map { _._2.toString }
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
-      .map { (_, 1) }
+    val counts = input
+      .map(_._2.toString)
+      .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
       .groupBy(0)
       .sum(1)
 
-    val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+    val words = counts
+      .map( t => (new Text(t._1), new LongWritable(t._2)) )
 
-    val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+    val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
       new TextOutputFormat[Text, LongWritable],
       new JobConf)
     hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ")
@@ -64,5 +70,12 @@ class WordCountMapredITCase extends JavaProgramTestBase {
 
     env.execute("Hadoop Compat WordCount")
   }
+
+  protected def testProgram() {
+    internalRun(testDeprecatedAPI = true)
+    postSubmit()
+    resultPath = getTempDirPath("result2")
+    internalRun(testDeprecatedAPI = false)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
index de2d376..e393d23 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.api.scala.hadoop.mapreduce
 
 import org.apache.flink.api.scala._
+import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
 import org.apache.flink.test.testdata.WordCountData
 import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
 import org.apache.hadoop.fs.Path
@@ -42,21 +43,34 @@ class WordCountMapreduceITCase extends JavaProgramTestBase {
   }
 
   protected def testProgram() {
+    internalRun(testDeprecatedAPI = true)
+    postSubmit()
+    resultPath = getTempDirPath("result2")
+    internalRun(testDeprecatedAPI = false)
+  }
+
+  private def internalRun (testDeprecatedAPI: Boolean): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val input =
-      env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+      if (testDeprecatedAPI) {
+        env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+      } else {
+        env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
+          classOf[Text], textPath))
+      }
 
-    val text = input map { _._2.toString }
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
-      .map { (_, 1) }
+    val counts = input
+      .map(_._2.toString)
+      .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
       .groupBy(0)
       .sum(1)
 
-    val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+    val words = counts
+      .map( t => (new Text(t._1), new LongWritable(t._2)) )
 
     val job = Job.getInstance()
-    val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+    val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
       new TextOutputFormat[Text, LongWritable],
       job)
     hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " ")