You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/11/02 20:46:39 UTC

[1/5] flink git commit: [FLINK-4996] [core] Make CrossHint @Public

Repository: flink
Updated Branches:
  refs/heads/master da991aebb -> 7d61e1f2f


[FLINK-4996] [core] Make CrossHint @Public

This closes #2743.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6346a899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6346a899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6346a899

Branch: refs/heads/master
Commit: 6346a89972416489bc43ee30946078341496d1e1
Parents: da991ae
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Nov 2 11:02:51 2016 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:20:27 2016 +0100

----------------------------------------------------------------------
 .../flink/api/common/operators/base/CrossOperatorBase.java   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6346a899/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
index 95d5bc3..72da53d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -33,6 +31,9 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * @see org.apache.flink.api.common.functions.CrossFunction
  */
@@ -42,6 +43,7 @@ public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<IN1, IN2,
 	/**
 	 * The cross hint tells the system which sizes to expect from the data sets
 	 */
+	@Public
 	public static enum CrossHint {
 		
 		OPTIMIZER_CHOOSES,


[2/5] flink git commit: [FLINK-4943] Fix typo ConfigConstants JavaDocs: YYARN -> YARN

Posted by fh...@apache.org.
[FLINK-4943] Fix typo ConfigConstants JavaDocs: YYARN -> YARN

This closes #2704.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed6a602b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed6a602b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed6a602b

Branch: refs/heads/master
Commit: ed6a602b34d185c1482b60b06ff585d08dab308b
Parents: 6346a89
Author: Makman2 <ma...@alice.de>
Authored: Thu Oct 27 15:16:12 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:22:33 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/configuration/ConfigConstants.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed6a602b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 0561fd7..10dbaaa 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -342,7 +342,7 @@ public final class ConfigConstants {
 	// ------------------------ YARN Configuration ------------------------
 
 	/**
-	 * The vcores exposed by YYARN.
+	 * The vcores exposed by YARN.
 	 */
 	public static final String YARN_VCORES = "yarn.containers.vcores";
 


[3/5] flink git commit: [FLINK-4623] [table] Add physical execution plan to StreamTableEnvironment explain().

Posted by fh...@apache.org.
[FLINK-4623] [table] Add physical execution plan to StreamTableEnvironment explain().

This closes #2720.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d60fe723
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d60fe723
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d60fe723

Branch: refs/heads/master
Commit: d60fe723aa357733c6ad8715b0e8c4e55ab7f52d
Parents: ed6a602
Author: anton solovev <an...@epam.com>
Authored: Tue Oct 25 15:55:42 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:30:18 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/explain/PlanJsonParser.java | 15 ++++++++-----
 .../api/table/StreamTableEnvironment.scala      | 18 +++++++++++++---
 .../api/scala/stream/ExplainStreamTest.scala    | 22 ++++++++++++++------
 .../test/scala/resources/testFilterStream0.out  | 13 ++++++++++++
 .../test/scala/resources/testUnionStream0.out   | 16 ++++++++++++++
 5 files changed, 70 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
index 3c4d3d9..bd14cd2 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
@@ -62,7 +62,7 @@ public class PlanJsonParser {
 			if (dele > -1) {
 				content = tempNode.getContents().substring(0, dele);
 			}
-			
+
 			//replace with certain content if node is dataSource to pass
 			//unit tests, because java and scala use different api to
 			//get input element
@@ -76,8 +76,11 @@ public class PlanJsonParser {
 				printTab(tabCount + 1, pw);
 				pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n");
 
-				printTab(tabCount + 1, pw);
-				pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n");
+				String mode = predecessors.get(0).getExchange_mode();
+				if (mode != null) {
+					printTab(tabCount + 1, pw);
+					pw.print("exchange_mode : " + mode + "\n");
+				}
 			}
 
 			if (tempNode.getDriver_strategy() != null) {
@@ -85,9 +88,11 @@ public class PlanJsonParser {
 				pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n");
 			}
 
-			printTab(tabCount + 1, pw);
-			pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+			if (tempNode.getGlobal_properties() != null) {
+				printTab(tabCount + 1, pw);
+				pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
 					+ tempNode.getGlobal_properties().get(0).getValue() + "\n");
+			}
 
 			if (extended) {
 				List<Global_properties> globalProperties = tempNode.getGlobal_properties();

http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index b9e889d..bca8d79 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{Programs, RuleSet}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.explain.PlanJsonParser
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
@@ -311,14 +313,24 @@ abstract class StreamTableEnvironment(
     *
     * @param table The table for which the AST and execution plan will be returned.
     */
-   def explain(table: Table): String = {
+  def explain(table: Table): String = {
 
     val ast = RelOptUtil.toString(table.getRelNode)
 
+    val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
+
+    val env = dataStream.getExecutionEnvironment
+    val jsonSqlPlan = env.getExecutionPlan
+
+    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
+
     s"== Abstract Syntax Tree ==" +
       System.lineSeparator +
-      ast
-
+      s"$ast" +
+      System.lineSeparator +
+      s"== Physical Execution Plan ==" +
+      System.lineSeparator +
+      s"$sqlPlan"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
index 71500f1..5eebb34 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
@@ -40,10 +40,12 @@ class ExplainStreamTest
       .toTable(tEnv, 'a, 'b)
       .filter("a % 2 = 0")
 
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val result = replaceString(tEnv.explain(table))
+
     val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
+      "../../src/test/scala/resources/testFilterStream0.out").mkString
+    val expect = replaceString(source)
+    assertEquals(result, expect)
   }
 
   @Test
@@ -55,10 +57,18 @@ class ExplainStreamTest
     val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
     val table = table1.unionAll(table2)
 
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val result = replaceString(tEnv.explain(table))
+
     val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
+      "../../src/test/scala/resources/testUnionStream0.out").mkString
+    val expect = replaceString(source)
+    assertEquals(result, expect)
   }
 
+  def replaceString(s: String): String = {
+    /* Stage {id} is ignored, because id keeps incrementing in test class
+     * while StreamExecutionEnvironment is up
+     */
+    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
index 3fda6de..20ae2b1 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
@@ -1,3 +1,16 @@
 == Abstract Syntax Tree ==
 LogicalFilter(condition=[=(MOD($0, 2), 0)])
   LogicalTableScan(table=[[_DataStreamTable_0]])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 2 : Operator
+		content : from: (a, b)
+		ship_strategy : REBALANCE
+
+		Stage 3 : Operator
+			content : where: (=(MOD(a, 2), 0)), select: (a, b)
+			ship_strategy : FORWARD
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
index b2e3000..ac3635d 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -2,3 +2,19 @@
 LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataStreamTable_0]])
   LogicalTableScan(table=[[_DataStreamTable_1]])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+Stage 2 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 3 : Operator
+		content : from: (count, word)
+		ship_strategy : REBALANCE
+
+		Stage 4 : Operator
+			content : from: (count, word)
+			ship_strategy : REBALANCE
+


[4/5] flink git commit: [FLINK-4743] [table] Add support for power(DOUBLE, DECIMAL) function.

Posted by fh...@apache.org.
[FLINK-4743] [table] Add support for power(DOUBLE, DECIMAL) function.

This closes #2686.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45651700
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45651700
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45651700

Branch: refs/heads/master
Commit: 4565170088595838ec53f3ca9b898126c62abbbc
Parents: d60fe72
Author: anton solovev <an...@epam.com>
Authored: Mon Oct 24 13:47:41 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:30:58 2016 +0100

----------------------------------------------------------------------
 .../table/codegen/calls/BuiltInMethods.scala    |  3 +-
 .../api/table/expressions/InputTypeSpec.scala   |  4 +-
 .../table/functions/utils/MathFunctions.scala   | 29 +++++++
 .../table/expressions/ScalarFunctionsTest.scala | 90 +++++++++++++++++++-
 4 files changed, 122 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/45651700/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
index a2715f0..6a6898d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
@@ -21,13 +21,14 @@ import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.calcite.linq4j.tree.Types
 import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.table.functions.utils.MathFunctions
 
 object BuiltInMethods {
   val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
   val EXP = Types.lookupMethod(classOf[Math], "exp", classOf[Double])
   val POWER = Types.lookupMethod(classOf[Math], "pow", classOf[Double], classOf[Double])
   val POWER_DEC = Types.lookupMethod(
-    classOf[SqlFunctions], "power", classOf[Long], classOf[JBigDecimal])
+    classOf[MathFunctions], "power", classOf[Double], classOf[JBigDecimal])
   val LN = Types.lookupMethod(classOf[Math], "log", classOf[Double])
   val ABS = Types.lookupMethod(classOf[SqlFunctions], "abs", classOf[Double])
   val ABS_DEC = Types.lookupMethod(classOf[SqlFunctions], "abs", classOf[JBigDecimal])

http://git-wip-us.apache.org/repos/asf/flink/blob/45651700/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
index f545d13..67e44a1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
@@ -49,7 +49,9 @@ trait InputTypeSpec extends Expression {
       ValidationSuccess
     } else {
       ValidationFailure(
-        s"$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}")
+        s"""|$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}.
+            |Operand should be casted to proper type
+            |""".stripMargin)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/45651700/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
new file mode 100644
index 0000000..8a0fe65
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions.utils
+
+import java.math.{BigDecimal => JBigDecimal}
+
+class MathFunctions {}
+
+object MathFunctions {
+  def power(a: Double, b: JBigDecimal): Double = {
+    Math.pow(a, b.doubleValue())
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/45651700/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 46aff01..a6975b9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -354,6 +354,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
 
   @Test
   def testPower(): Unit = {
+    // f7: int , f4: long, f6: double
     testAllApis(
       'f2.power('f7),
       "f2.power(f7)",
@@ -377,11 +378,94 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "f4.power(f5)",
       "POWER(f4, f5)",
       math.pow(44.toLong, 4.5.toFloat).toString)
+
+    // f5: float
+    testAllApis('f5.power('f5),
+      "f5.power(f5)",
+      "power(f5, f5)",
+      math.pow(4.5F, 4.5F).toString)
+
+    testAllApis('f5.power('f6),
+      "f5.power(f6)",
+      "power(f5, f6)",
+      math.pow(4.5F, 4.6D).toString)
+
+    testAllApis('f5.power('f7),
+      "f5.power(f7)",
+      "power(f5, f7)",
+      math.pow(4.5F, 3).toString)
+
+    testAllApis('f5.power('f4),
+      "f5.power(f4)",
+      "power(f5, f4)",
+      math.pow(4.5F, 44L).toString)
+
+    // f22: bigDecimal
+    // TODO delete casting in SQL when CALCITE-1467 is fixed
+    testAllApis(
+      'f22.cast(Types.DOUBLE).power('f5),
+      "f22.cast(DOUBLE).power(f5)",
+      "power(CAST(f22 AS DOUBLE), f5)",
+      math.pow(2, 4.5F).toString)
+
+    testAllApis(
+      'f22.cast(Types.DOUBLE).power('f6),
+      "f22.cast(DOUBLE).power(f6)",
+      "power(CAST(f22 AS DOUBLE), f6)",
+      math.pow(2, 4.6D).toString)
+
+    testAllApis(
+      'f22.cast(Types.DOUBLE).power('f7),
+      "f22.cast(DOUBLE).power(f7)",
+      "power(CAST(f22 AS DOUBLE), f7)",
+      math.pow(2, 3).toString)
+
+    testAllApis(
+      'f22.cast(Types.DOUBLE).power('f4),
+      "f22.cast(DOUBLE).power(f4)",
+      "power(CAST(f22 AS DOUBLE), f4)",
+      math.pow(2, 44L).toString)
+
+    testAllApis(
+      'f6.power('f22.cast(Types.DOUBLE)),
+      "f6.power(f22.cast(DOUBLE))",
+      "power(f6, f22)",
+      math.pow(4.6D, 2).toString)
   }
 
   @Test
   def testSqrt(): Unit = {
     testAllApis(
+      'f6.sqrt(),
+      "f6.sqrt",
+      "SQRT(f6)",
+      math.sqrt(4.6D).toString)
+
+    testAllApis(
+      'f7.sqrt(),
+      "f7.sqrt",
+      "SQRT(f7)",
+      math.sqrt(3).toString)
+
+    testAllApis(
+      'f4.sqrt(),
+      "f4.sqrt",
+      "SQRT(f4)",
+      math.sqrt(44L).toString)
+
+    testAllApis(
+      'f22.cast(Types.DOUBLE).sqrt(),
+      "f22.cast(DOUBLE).sqrt",
+      "SQRT(CAST(f22 AS DOUBLE))",
+      math.sqrt(2.0).toString)
+
+    testAllApis(
+      'f5.sqrt(),
+      "f5.sqrt",
+      "SQRT(f5)",
+      math.pow(4.5F, 0.5).toString)
+
+    testAllApis(
       25.sqrt(),
       "25.sqrt()",
       "SQRT(25)",
@@ -944,7 +1028,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   def testData = {
-    val testData = new Row(22)
+    val testData = new Row(23)
     testData.setField(0, "This is a test String.")
     testData.setField(1, true)
     testData.setField(2, 42.toByte)
@@ -967,6 +1051,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
     testData.setField(19, 1467012213000L) // +16979 07:23:33.000
     testData.setField(20, 25) // +2-01
     testData.setField(21, null)
+    testData.setField(22, BigDecimal("2").bigDecimal)
     testData
   }
 
@@ -993,6 +1078,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       Types.TIMESTAMP,
       Types.INTERVAL_MILLIS,
       Types.INTERVAL_MONTHS,
-      Types.BOOLEAN)).asInstanceOf[TypeInformation[Any]]
+      Types.BOOLEAN,
+      Types.DECIMAL)).asInstanceOf[TypeInformation[Any]]
   }
 }


[5/5] flink git commit: [FLINK-4315] [dataSet] [hadoopCompat] Annotate Hadoop-related methods in ExecutionEnvironment as @Deprecated.

Posted by fh...@apache.org.
[FLINK-4315] [dataSet] [hadoopCompat] Annotate Hadoop-related methods in ExecutionEnvironment as @Deprecated.

- Preparation to remove Hadoop dependency from flink-java
- Alternatives for deprecated functionality is provided in flink-hadoop-compatibility via HadoopInputs

This closes #2637.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d61e1f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d61e1f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d61e1f2

Branch: refs/heads/master
Commit: 7d61e1f2fd0c9b0e3719b2d7252a164cfdf941c4
Parents: 4565170
Author: Evgeny_Kincharov <Ev...@epam.com>
Authored: Fri Oct 14 17:19:22 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:31:25 2016 +0100

----------------------------------------------------------------------
 .../flink-hadoop-compatibility/pom.xml          |  98 +++++++++++++
 .../flink/hadoopcompatibility/HadoopInputs.java | 118 +++++++++++++++
 .../flink/hadoopcompatibility/HadoopUtils.java  |  52 +++++++
 .../scala/HadoopInputs.scala                    | 143 +++++++++++++++++++
 .../hadoopcompatibility/HadoopUtilsTest.java    |  34 +++++
 flink-java/pom.xml                              |   2 +-
 .../flink/api/java/ExecutionEnvironment.java    |  33 ++++-
 .../flink/api/java/utils/ParameterTool.java     |   3 +
 .../java/utils/AbstractParameterToolTest.java   |  71 +++++++++
 .../flink/api/java/utils/ParameterToolTest.java |  46 +-----
 .../flink/api/scala/ExecutionEnvironment.scala  |  37 ++++-
 .../hadoop/mapred/WordCountMapredITCase.java    |  16 ++-
 .../mapreduce/WordCountMapreduceITCase.java     |  17 ++-
 .../hadoop/mapred/WordCountMapredITCase.scala   |  35 +++--
 .../mapreduce/WordCountMapreduceITCase.scala    |  26 +++-
 15 files changed, 661 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
index aa818f6..8143a03 100644
--- a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
+++ b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
@@ -47,6 +47,21 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>${shading-artifact.name}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
@@ -78,6 +93,89 @@ under the License.
 				<groupId>com.github.siom79.japicmp</groupId>
 				<artifactId>japicmp-maven-plugin</artifactId>
 			</plugin>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Scala Code Style, most of the configuration done via plugin management -->
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<configuration>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
new file mode 100644
index 0000000..9e8a3e4
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+/**
+ * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
+ *
+ * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
+ * and {@link org.apache.hadoop.mapreduce.InputFormat}.
+ *
+ * Key value pairs produced by the Hadoop InputFormats are converted into Flink
+ * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
+ *
+ */
+
+public final class HadoopInputs {
+	// ----------------------------------- Hadoop Input Format ---------------------------------------
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+		// set input path in JobConf
+		org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+		// return wrapping InputFormat
+		return createHadoopInput(mapredInputFormat, key, value, job);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} to read a Hadoop sequence file for the given key and value classes.
+	 *
+	 * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
+		return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.InputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+	 */
+	public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+		return new HadoopInputFormat<>(mapredInputFormat, key, value, job);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException
+	{
+		// set input path in Job
+		org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+		// return wrapping InputFormat
+		return createHadoopInput(mapreduceInputFormat, key, value, job);
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+	 */
+	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+			org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException
+	{
+		return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
+	}
+
+	/**
+	 * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.InputFormat}.
+	 *
+	 * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+	 */
+	public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
+			org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
+	{
+		return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
new file mode 100644
index 0000000..97ca329
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.commons.cli.Option;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to work with Apache Hadoop libraries.
+ */
+public class HadoopUtils {
+	/**
+	 * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}
+	 *
+	 * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser}
+	 * @return A {@link ParameterTool}
+	 * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser}
+	 * @see GenericOptionsParser
+	 */
+	public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException {
+		Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions();
+		Map<String, String> map = new HashMap<String, String>();
+		for (Option option : options) {
+			String[] split = option.getValue().split("=");
+			map.put(split[0], split[1]);
+		}
+		return ParameterTool.fromMap(map);
+	}
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
new file mode 100644
index 0000000..133a5f4
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hadoopcompatibility.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.hadoop.mapreduce
+import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.hadoop.fs.{Path => HadoopPath}
+import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
+import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat}
+
+/**
+  * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
+  *
+  * It provides methods to create Flink InputFormat wrappers for Hadoop
+  * [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]].
+  *
+  * Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where
+  * the first field is the key and the second field is the value.
+  *
+  */
+object HadoopInputs {
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    // set input path in JobConf
+    MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    // wrap mapredInputFormat
+    createHadoopInput(mapredInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence
+    * file with the given key and value classes.
+    */
+  def readSequenceFile[K, V](
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    readHadoopFile(
+      new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
+      key,
+      value,
+      inputPath
+    )
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapred.InputFormat]].
+    */
+  def createHadoopInput[K, V](
+      mapredInputFormat: MapredInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+    new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
+
+    // set input path in Job
+    MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    // wrap mapreduceInputFormat
+    createHadoopInput(mapreduceInputFormat, key, value, job)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+    */
+  def readHadoopFile[K, V](
+      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] =
+  {
+    readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance)
+  }
+
+  /**
+    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+    * [[org.apache.hadoop.mapreduce.InputFormat]].
+    */
+  def createHadoopInput[K, V](
+      mapreduceInputFormat: MapreduceInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
+
+    new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
new file mode 100644
index 0000000..6f7673b
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.java.utils.AbstractParameterToolTest;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class HadoopUtilsTest extends AbstractParameterToolTest {
+
+	@Test
+	public void testParamsFromGenericOptionsParser() throws IOException {
+		ParameterTool parameter = HadoopUtils.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"});
+		validate(parameter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 6924da8..5bc81c6 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -84,7 +84,7 @@ under the License.
 				<artifactId>japicmp-maven-plugin</artifactId>
 			</plugin>
 
-			<!-- Because flink-scala and flink-avro uses it in tests -->
+			<!-- Because flink-scala, flink-avro and flink-hadoop-compatibility uses it in tests -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-jar-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index add8531..964eed1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -585,9 +585,12 @@ public abstract class ExecutionEnvironment {
 	// ----------------------------------- Hadoop Input Format ---------------------------------------
 
 	/**
-	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. The
-	 * given inputName is set on the given job.
+	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}.
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String, JobConf)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
 		DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
@@ -600,7 +603,11 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat}
 	 * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
- 	 */
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class<K>, Class<V>, String)}
+	 * from the flink-hadoop-compatibility module.
+	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
 		return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
@@ -609,7 +616,11 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A
 	 * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
 		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
@@ -617,7 +628,11 @@ public abstract class ExecutionEnvironment {
 
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}.
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V>, Class<K>, Class<V>, JobConf)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
 		HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job);
@@ -628,7 +643,11 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
 	 * given inputName is set on the given job.
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String, Job)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
 		DataSource<Tuple2<K, V>> result = createHadoopInput(mapreduceInputFormat, key, value, job);
@@ -642,7 +661,11 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
 	 * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+	 *
+	 * @deprecated Please use {@link  org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
 		return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
@@ -650,7 +673,11 @@ public abstract class ExecutionEnvironment {
 
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
+	 *
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V>, Class<K>, Class<V>, Job)}
+	 * from the flink-hadoop-compatibility module.
 	 */
+	@Deprecated
 	@PublicEvolving
 	public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
 		org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 8f504e4..8e15441 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -191,7 +191,10 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	 * @return A {@link ParameterTool}
 	 * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser}
 	 * @see GenericOptionsParser
+	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopUtils#paramsFromGenericOptionsParser(String[])}
+	 * from project flink-hadoop-compatibility
 	 */
+	@Deprecated
 	@PublicEvolving
 	public static ParameterTool fromGenericOptionsParser(String[] args) throws IOException {
 		Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions();

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
new file mode 100644
index 0000000..9aa9a95
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+public abstract class AbstractParameterToolTest {
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	protected void validate(ParameterTool parameter) {
+		ClosureCleaner.ensureSerializable(parameter);
+		Assert.assertEquals("myInput", parameter.getRequired("input"));
+		Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue"));
+		Assert.assertEquals(null, parameter.get("whatever"));
+		Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L));
+		Assert.assertTrue(parameter.getBoolean("thisIsUseful", true));
+		Assert.assertEquals(42, parameter.getByte("myDefaultByte", (byte) 42));
+		Assert.assertEquals(42, parameter.getShort("myDefaultShort", (short) 42));
+
+		Configuration config = parameter.getConfiguration();
+		Assert.assertEquals(15L, config.getLong("expectedCount", -1L));
+
+		Properties props = parameter.getProperties();
+		Assert.assertEquals("myInput", props.getProperty("input"));
+		props = null;
+
+		// -------- test the default file creation ------------
+		try {
+			String pathToFile = tmp.newFile().getAbsolutePath();
+			parameter.createPropertiesFile(pathToFile);
+			Properties defaultProps = new Properties();
+			try (FileInputStream fis = new FileInputStream(pathToFile)) {
+				defaultProps.load(fis);
+			}
+
+			Assert.assertEquals("myDefaultValue", defaultProps.get("output"));
+			Assert.assertEquals("-1", defaultProps.get("expectedCount"));
+			Assert.assertTrue(defaultProps.containsKey("input"));
+
+		} catch (IOException e) {
+			Assert.fail(e.getMessage());
+			e.printStackTrace();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
index 605f033..9b63985 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
@@ -18,25 +18,17 @@
 
 package org.apache.flink.api.java.utils;
 
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Map;
 import java.util.Properties;
 
-public class ParameterToolTest {
-
-	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
+public class ParameterToolTest extends AbstractParameterToolTest {
 
 	// ----- Parser tests -----------------
 
@@ -162,40 +154,4 @@ public class ParameterToolTest {
 		ParameterTool parameter = ParameterTool.fromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"});
 		validate(parameter);
 	}
-
-	private void validate(ParameterTool parameter) {
-		ClosureCleaner.ensureSerializable(parameter);
-		Assert.assertEquals("myInput", parameter.getRequired("input"));
-		Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue"));
-		Assert.assertEquals(null, parameter.get("whatever"));
-		Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L));
-		Assert.assertTrue(parameter.getBoolean("thisIsUseful", true));
-		Assert.assertEquals(42, parameter.getByte("myDefaultByte", (byte) 42));
-		Assert.assertEquals(42, parameter.getShort("myDefaultShort", (short) 42));
-
-		Configuration config = parameter.getConfiguration();
-		Assert.assertEquals(15L, config.getLong("expectedCount", -1L));
-
-		Properties props = parameter.getProperties();
-		Assert.assertEquals("myInput", props.getProperty("input"));
-		props = null;
-
-		// -------- test the default file creation ------------
-		try {
-			String pathToFile = tmp.newFile().getAbsolutePath();
-			parameter.createPropertiesFile(pathToFile);
-			Properties defaultProps = new Properties();
-			try (FileInputStream fis = new FileInputStream(pathToFile)) {
-				defaultProps.load(fis);
-			}
-
-			Assert.assertEquals("myDefaultValue", defaultProps.get("output"));
-			Assert.assertEquals("-1", defaultProps.get("expectedCount"));
-			Assert.assertTrue(defaultProps.containsKey("input"));
-
-		} catch (IOException e) {
-			Assert.fail(e.getMessage());
-			e.printStackTrace();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 4f9d569..18aab07 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -43,7 +43,7 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 /**
- * The ExecutionEnviroment is the context in which a program is executed. A local environment will
+ * The ExecutionEnvironment is the context in which a program is executed. A local environment will
  * cause execution in the current JVM, a remote environment will cause execution on a remote
  * cluster installation.
  *
@@ -412,7 +412,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The
    * given inputName is set on the given job.
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def readHadoopFile[K, V](
       mapredInputFormat: MapredFileInputFormat[K, V],
@@ -429,7 +434,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A
    * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def readHadoopFile[K, V](
       mapredInputFormat: MapredFileInputFormat[K, V],
@@ -443,7 +453,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a [[DataSet]] from [[org.apache.hadoop.mapred.SequenceFileInputFormat]]
    * A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readSequenceFile]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def readSequenceFile[K, V](
       key: Class[K],
@@ -456,7 +471,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]].
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def createHadoopInput[K, V](
       mapredInputFormat: MapredInputFormat[K, V],
@@ -471,7 +491,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
    * The given inputName is set on the given job.
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def readHadoopFile[K, V](
       mapreduceInputFormat: MapreduceFileInputFormat[K, V],
@@ -489,7 +514,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Creates a [[DataSet]] from the given
    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A
    * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created.
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def readHadoopFile[K, V](
       mapreduceInputFormat: MapreduceFileInputFormat[K, V],
@@ -502,7 +532,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]].
+   *
+   * @deprecated Please use
+   *             [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]]
+   * from the flink-hadoop-compatibility module.
    */
+  @Deprecated
   @PublicEvolving
   def createHadoopInput[K, V](
       mapreduceInputFormat: MapreduceInputFormat[K, V],

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
index a0e3468..80f311a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
@@ -52,11 +53,24 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
+		internalRun(true);
+		postSubmit();
+		resultPath = getTempDirPath("result2");
+		internalRun(false);
+	}
+
+	private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
+		DataSet<Tuple2<LongWritable, Text>> input;
 
-		DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(),
+		if (isTestDeprecatedAPI) {
+			input = env.readHadoopFile(new TextInputFormat(),
 				LongWritable.class, Text.class, textPath);
+		} else {
+			input = env.createInput(readHadoopFile(new TextInputFormat(),
+				LongWritable.class, Text.class, textPath));
+		}
 
 		DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
index fee49bf..3293770 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
@@ -52,11 +53,23 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		internalRun(true);
+		postSubmit();
+		resultPath = getTempDirPath("result2");
+		internalRun(false);
+	}
 
+	private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(),
+		DataSet<Tuple2<LongWritable, Text>> input;
+		if (isTestDeprecatedAPI) {
+			input = env.readHadoopFile(new TextInputFormat(),
 				LongWritable.class, Text.class, textPath);
+		} else {
+			input = env.createInput(readHadoopFile(new TextInputFormat(),
+				LongWritable.class, Text.class, textPath));
+		}
 
 		DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
index b09ecc4..6b414d6 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
@@ -18,12 +18,12 @@
 package org.apache.flink.api.scala.hadoop.mapred
 
 import org.apache.flink.api.scala._
-
+import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
 import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
+import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{Text, LongWritable}
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextInputFormat, TextOutputFormat}
 
 class WordCountMapredITCase extends JavaProgramTestBase {
   protected var textPath: String = null
@@ -39,21 +39,27 @@ class WordCountMapredITCase extends JavaProgramTestBase {
                                                 resultPath, Array[String](".", "_"))
   }
 
-  protected def testProgram() {
+  private def internalRun (testDeprecatedAPI: Boolean): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val input =
-      env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+      if (testDeprecatedAPI) {
+        env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+      } else {
+        env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
+          classOf[Text], textPath))
+      }
 
-    val text = input map { _._2.toString }
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
-      .map { (_, 1) }
+    val counts = input
+      .map(_._2.toString)
+      .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
       .groupBy(0)
       .sum(1)
 
-    val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+    val words = counts
+      .map( t => (new Text(t._1), new LongWritable(t._2)) )
 
-    val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+    val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
       new TextOutputFormat[Text, LongWritable],
       new JobConf)
     hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ")
@@ -64,5 +70,12 @@ class WordCountMapredITCase extends JavaProgramTestBase {
 
     env.execute("Hadoop Compat WordCount")
   }
+
+  protected def testProgram() {
+    internalRun(testDeprecatedAPI = true)
+    postSubmit()
+    resultPath = getTempDirPath("result2")
+    internalRun(testDeprecatedAPI = false)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
index de2d376..e393d23 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.api.scala.hadoop.mapreduce
 
 import org.apache.flink.api.scala._
+import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
 import org.apache.flink.test.testdata.WordCountData
 import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
 import org.apache.hadoop.fs.Path
@@ -42,21 +43,34 @@ class WordCountMapreduceITCase extends JavaProgramTestBase {
   }
 
   protected def testProgram() {
+    internalRun(testDeprecatedAPI = true)
+    postSubmit()
+    resultPath = getTempDirPath("result2")
+    internalRun(testDeprecatedAPI = false)
+  }
+
+  private def internalRun (testDeprecatedAPI: Boolean): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val input =
-      env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+      if (testDeprecatedAPI) {
+        env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+      } else {
+        env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
+          classOf[Text], textPath))
+      }
 
-    val text = input map { _._2.toString }
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
-      .map { (_, 1) }
+    val counts = input
+      .map(_._2.toString)
+      .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
       .groupBy(0)
       .sum(1)
 
-    val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+    val words = counts
+      .map( t => (new Text(t._1), new LongWritable(t._2)) )
 
     val job = Job.getInstance()
-    val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+    val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
       new TextOutputFormat[Text, LongWritable],
       job)
     hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " ")