You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/11/02 20:46:39 UTC
[1/5] flink git commit: [FLINK-4996] [core] Make CrossHint @Public
Repository: flink
Updated Branches:
refs/heads/master da991aebb -> 7d61e1f2f
[FLINK-4996] [core] Make CrossHint @Public
This closes #2743.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6346a899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6346a899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6346a899
Branch: refs/heads/master
Commit: 6346a89972416489bc43ee30946078341496d1e1
Parents: da991ae
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Nov 2 11:02:51 2016 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:20:27 2016 +0100
----------------------------------------------------------------------
.../flink/api/common/operators/base/CrossOperatorBase.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6346a899/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
index 95d5bc3..72da53d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
@@ -18,10 +18,8 @@
package org.apache.flink.api.common.operators.base;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -33,6 +31,9 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* @see org.apache.flink.api.common.functions.CrossFunction
*/
@@ -42,6 +43,7 @@ public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<IN1, IN2,
/**
* The cross hint tells the system which sizes to expect from the data sets
*/
+ @Public
public static enum CrossHint {
OPTIMIZER_CHOOSES,
[2/5] flink git commit: [FLINK-4943] Fix typo ConfigConstants
JavaDocs: YYARN -> YARN
Posted by fh...@apache.org.
[FLINK-4943] Fix typo ConfigConstants JavaDocs: YYARN -> YARN
This closes #2704.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed6a602b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed6a602b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed6a602b
Branch: refs/heads/master
Commit: ed6a602b34d185c1482b60b06ff585d08dab308b
Parents: 6346a89
Author: Makman2 <ma...@alice.de>
Authored: Thu Oct 27 15:16:12 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:22:33 2016 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/configuration/ConfigConstants.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed6a602b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 0561fd7..10dbaaa 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -342,7 +342,7 @@ public final class ConfigConstants {
// ------------------------ YARN Configuration ------------------------
/**
- * The vcores exposed by YYARN.
+ * The vcores exposed by YARN.
*/
public static final String YARN_VCORES = "yarn.containers.vcores";
[3/5] flink git commit: [FLINK-4623] [table] Add physical execution
plan to StreamTableEnvironment explain().
Posted by fh...@apache.org.
[FLINK-4623] [table] Add physical execution plan to StreamTableEnvironment explain().
This closes #2720.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d60fe723
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d60fe723
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d60fe723
Branch: refs/heads/master
Commit: d60fe723aa357733c6ad8715b0e8c4e55ab7f52d
Parents: ed6a602
Author: anton solovev <an...@epam.com>
Authored: Tue Oct 25 15:55:42 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:30:18 2016 +0100
----------------------------------------------------------------------
.../flink/api/table/explain/PlanJsonParser.java | 15 ++++++++-----
.../api/table/StreamTableEnvironment.scala | 18 +++++++++++++---
.../api/scala/stream/ExplainStreamTest.scala | 22 ++++++++++++++------
.../test/scala/resources/testFilterStream0.out | 13 ++++++++++++
.../test/scala/resources/testUnionStream0.out | 16 ++++++++++++++
5 files changed, 70 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
index 3c4d3d9..bd14cd2 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
@@ -62,7 +62,7 @@ public class PlanJsonParser {
if (dele > -1) {
content = tempNode.getContents().substring(0, dele);
}
-
+
//replace with certain content if node is dataSource to pass
//unit tests, because java and scala use different api to
//get input element
@@ -76,8 +76,11 @@ public class PlanJsonParser {
printTab(tabCount + 1, pw);
pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n");
- printTab(tabCount + 1, pw);
- pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n");
+ String mode = predecessors.get(0).getExchange_mode();
+ if (mode != null) {
+ printTab(tabCount + 1, pw);
+ pw.print("exchange_mode : " + mode + "\n");
+ }
}
if (tempNode.getDriver_strategy() != null) {
@@ -85,9 +88,11 @@ public class PlanJsonParser {
pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n");
}
- printTab(tabCount + 1, pw);
- pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+ if (tempNode.getGlobal_properties() != null) {
+ printTab(tabCount + 1, pw);
+ pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+ tempNode.getGlobal_properties().get(0).getValue() + "\n");
+ }
if (extended) {
List<Global_properties> globalProperties = tempNode.getGlobal_properties();
http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index b9e889d..bca8d79 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{Programs, RuleSet}
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.explain.PlanJsonParser
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
@@ -311,14 +313,24 @@ abstract class StreamTableEnvironment(
*
* @param table The table for which the AST and execution plan will be returned.
*/
- def explain(table: Table): String = {
+ def explain(table: Table): String = {
val ast = RelOptUtil.toString(table.getRelNode)
+ val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
+
+ val env = dataStream.getExecutionEnvironment
+ val jsonSqlPlan = env.getExecutionPlan
+
+ val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
+
s"== Abstract Syntax Tree ==" +
System.lineSeparator +
- ast
-
+ s"$ast" +
+ System.lineSeparator +
+ s"== Physical Execution Plan ==" +
+ System.lineSeparator +
+ s"$sqlPlan"
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
index 71500f1..5eebb34 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
@@ -40,10 +40,12 @@ class ExplainStreamTest
.toTable(tEnv, 'a, 'b)
.filter("a % 2 = 0")
- val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+ val result = replaceString(tEnv.explain(table))
+
val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n")
- assertEquals(result, source)
+ "../../src/test/scala/resources/testFilterStream0.out").mkString
+ val expect = replaceString(source)
+ assertEquals(result, expect)
}
@Test
@@ -55,10 +57,18 @@ class ExplainStreamTest
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1.unionAll(table2)
- val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+ val result = replaceString(tEnv.explain(table))
+
val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n")
- assertEquals(result, source)
+ "../../src/test/scala/resources/testUnionStream0.out").mkString
+ val expect = replaceString(source)
+ assertEquals(result, expect)
}
+ def replaceString(s: String): String = {
+ /* Stage {id} is ignored, because id keeps incrementing in test class
+ * while StreamExecutionEnvironment is up
+ */
+ s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
index 3fda6de..20ae2b1 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
@@ -1,3 +1,16 @@
== Abstract Syntax Tree ==
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataStreamTable_0]])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+ content : collect elements with CollectionInputFormat
+
+ Stage 2 : Operator
+ content : from: (a, b)
+ ship_strategy : REBALANCE
+
+ Stage 3 : Operator
+ content : where: (=(MOD(a, 2), 0)), select: (a, b)
+ ship_strategy : FORWARD
+
http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
index b2e3000..ac3635d 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -2,3 +2,19 @@
LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+ content : collect elements with CollectionInputFormat
+
+Stage 2 : Data Source
+ content : collect elements with CollectionInputFormat
+
+ Stage 3 : Operator
+ content : from: (count, word)
+ ship_strategy : REBALANCE
+
+ Stage 4 : Operator
+ content : from: (count, word)
+ ship_strategy : REBALANCE
+
[4/5] flink git commit: [FLINK-4743] [table] Add support for
power(DOUBLE, DECIMAL) function.
Posted by fh...@apache.org.
[FLINK-4743] [table] Add support for power(DOUBLE, DECIMAL) function.
This closes #2686.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45651700
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45651700
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45651700
Branch: refs/heads/master
Commit: 4565170088595838ec53f3ca9b898126c62abbbc
Parents: d60fe72
Author: anton solovev <an...@epam.com>
Authored: Mon Oct 24 13:47:41 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:30:58 2016 +0100
----------------------------------------------------------------------
.../table/codegen/calls/BuiltInMethods.scala | 3 +-
.../api/table/expressions/InputTypeSpec.scala | 4 +-
.../table/functions/utils/MathFunctions.scala | 29 +++++++
.../table/expressions/ScalarFunctionsTest.scala | 90 +++++++++++++++++++-
4 files changed, 122 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/45651700/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
index a2715f0..6a6898d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
@@ -21,13 +21,14 @@ import java.math.{BigDecimal => JBigDecimal}
import org.apache.calcite.linq4j.tree.Types
import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.table.functions.utils.MathFunctions
object BuiltInMethods {
val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
val EXP = Types.lookupMethod(classOf[Math], "exp", classOf[Double])
val POWER = Types.lookupMethod(classOf[Math], "pow", classOf[Double], classOf[Double])
val POWER_DEC = Types.lookupMethod(
- classOf[SqlFunctions], "power", classOf[Long], classOf[JBigDecimal])
+ classOf[MathFunctions], "power", classOf[Double], classOf[JBigDecimal])
val LN = Types.lookupMethod(classOf[Math], "log", classOf[Double])
val ABS = Types.lookupMethod(classOf[SqlFunctions], "abs", classOf[Double])
val ABS_DEC = Types.lookupMethod(classOf[SqlFunctions], "abs", classOf[JBigDecimal])
http://git-wip-us.apache.org/repos/asf/flink/blob/45651700/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
index f545d13..67e44a1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
@@ -49,7 +49,9 @@ trait InputTypeSpec extends Expression {
ValidationSuccess
} else {
ValidationFailure(
- s"$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}")
+ s"""|$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}.
+ |Operand should be casted to proper type
+ |""".stripMargin)
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/45651700/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
new file mode 100644
index 0000000..8a0fe65
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions.utils
+
+import java.math.{BigDecimal => JBigDecimal}
+
+class MathFunctions {}
+
+object MathFunctions {
+ def power(a: Double, b: JBigDecimal): Double = {
+ Math.pow(a, b.doubleValue())
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/45651700/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 46aff01..a6975b9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -354,6 +354,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
@Test
def testPower(): Unit = {
+ // f7: int , f4: long, f6: double
testAllApis(
'f2.power('f7),
"f2.power(f7)",
@@ -377,11 +378,94 @@ class ScalarFunctionsTest extends ExpressionTestBase {
"f4.power(f5)",
"POWER(f4, f5)",
math.pow(44.toLong, 4.5.toFloat).toString)
+
+ // f5: float
+ testAllApis('f5.power('f5),
+ "f5.power(f5)",
+ "power(f5, f5)",
+ math.pow(4.5F, 4.5F).toString)
+
+ testAllApis('f5.power('f6),
+ "f5.power(f6)",
+ "power(f5, f6)",
+ math.pow(4.5F, 4.6D).toString)
+
+ testAllApis('f5.power('f7),
+ "f5.power(f7)",
+ "power(f5, f7)",
+ math.pow(4.5F, 3).toString)
+
+ testAllApis('f5.power('f4),
+ "f5.power(f4)",
+ "power(f5, f4)",
+ math.pow(4.5F, 44L).toString)
+
+ // f22: bigDecimal
+ // TODO delete casting in SQL when CALCITE-1467 is fixed
+ testAllApis(
+ 'f22.cast(Types.DOUBLE).power('f5),
+ "f22.cast(DOUBLE).power(f5)",
+ "power(CAST(f22 AS DOUBLE), f5)",
+ math.pow(2, 4.5F).toString)
+
+ testAllApis(
+ 'f22.cast(Types.DOUBLE).power('f6),
+ "f22.cast(DOUBLE).power(f6)",
+ "power(CAST(f22 AS DOUBLE), f6)",
+ math.pow(2, 4.6D).toString)
+
+ testAllApis(
+ 'f22.cast(Types.DOUBLE).power('f7),
+ "f22.cast(DOUBLE).power(f7)",
+ "power(CAST(f22 AS DOUBLE), f7)",
+ math.pow(2, 3).toString)
+
+ testAllApis(
+ 'f22.cast(Types.DOUBLE).power('f4),
+ "f22.cast(DOUBLE).power(f4)",
+ "power(CAST(f22 AS DOUBLE), f4)",
+ math.pow(2, 44L).toString)
+
+ testAllApis(
+ 'f6.power('f22.cast(Types.DOUBLE)),
+ "f6.power(f22.cast(DOUBLE))",
+ "power(f6, f22)",
+ math.pow(4.6D, 2).toString)
}
@Test
def testSqrt(): Unit = {
testAllApis(
+ 'f6.sqrt(),
+ "f6.sqrt",
+ "SQRT(f6)",
+ math.sqrt(4.6D).toString)
+
+ testAllApis(
+ 'f7.sqrt(),
+ "f7.sqrt",
+ "SQRT(f7)",
+ math.sqrt(3).toString)
+
+ testAllApis(
+ 'f4.sqrt(),
+ "f4.sqrt",
+ "SQRT(f4)",
+ math.sqrt(44L).toString)
+
+ testAllApis(
+ 'f22.cast(Types.DOUBLE).sqrt(),
+ "f22.cast(DOUBLE).sqrt",
+ "SQRT(CAST(f22 AS DOUBLE))",
+ math.sqrt(2.0).toString)
+
+ testAllApis(
+ 'f5.sqrt(),
+ "f5.sqrt",
+ "SQRT(f5)",
+ math.pow(4.5F, 0.5).toString)
+
+ testAllApis(
25.sqrt(),
"25.sqrt()",
"SQRT(25)",
@@ -944,7 +1028,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
// ----------------------------------------------------------------------------------------------
def testData = {
- val testData = new Row(22)
+ val testData = new Row(23)
testData.setField(0, "This is a test String.")
testData.setField(1, true)
testData.setField(2, 42.toByte)
@@ -967,6 +1051,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
testData.setField(19, 1467012213000L) // +16979 07:23:33.000
testData.setField(20, 25) // +2-01
testData.setField(21, null)
+ testData.setField(22, BigDecimal("2").bigDecimal)
testData
}
@@ -993,6 +1078,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
Types.TIMESTAMP,
Types.INTERVAL_MILLIS,
Types.INTERVAL_MONTHS,
- Types.BOOLEAN)).asInstanceOf[TypeInformation[Any]]
+ Types.BOOLEAN,
+ Types.DECIMAL)).asInstanceOf[TypeInformation[Any]]
}
}
[5/5] flink git commit: [FLINK-4315] [dataSet] [hadoopCompat]
Annotate Hadoop-related methods in ExecutionEnvironment as @Deprecated.
Posted by fh...@apache.org.
[FLINK-4315] [dataSet] [hadoopCompat] Annotate Hadoop-related methods in ExecutionEnvironment as @Deprecated.
- Preparation to remove Hadoop dependency from flink-java
- Alternatives for deprecated functionality is provided in flink-hadoop-compatibility via HadoopInputs
This closes #2637.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d61e1f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d61e1f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d61e1f2
Branch: refs/heads/master
Commit: 7d61e1f2fd0c9b0e3719b2d7252a164cfdf941c4
Parents: 4565170
Author: Evgeny_Kincharov <Ev...@epam.com>
Authored: Fri Oct 14 17:19:22 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 2 18:31:25 2016 +0100
----------------------------------------------------------------------
.../flink-hadoop-compatibility/pom.xml | 98 +++++++++++++
.../flink/hadoopcompatibility/HadoopInputs.java | 118 +++++++++++++++
.../flink/hadoopcompatibility/HadoopUtils.java | 52 +++++++
.../scala/HadoopInputs.scala | 143 +++++++++++++++++++
.../hadoopcompatibility/HadoopUtilsTest.java | 34 +++++
flink-java/pom.xml | 2 +-
.../flink/api/java/ExecutionEnvironment.java | 33 ++++-
.../flink/api/java/utils/ParameterTool.java | 3 +
.../java/utils/AbstractParameterToolTest.java | 71 +++++++++
.../flink/api/java/utils/ParameterToolTest.java | 46 +-----
.../flink/api/scala/ExecutionEnvironment.scala | 37 ++++-
.../hadoop/mapred/WordCountMapredITCase.java | 16 ++-
.../mapreduce/WordCountMapreduceITCase.java | 17 ++-
.../hadoop/mapred/WordCountMapredITCase.scala | 35 +++--
.../mapreduce/WordCountMapreduceITCase.scala | 26 +++-
15 files changed, 661 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
index aa818f6..8143a03 100644
--- a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
+++ b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
@@ -47,6 +47,21 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>${shading-artifact.name}</artifactId>
<version>${project.version}</version>
</dependency>
@@ -78,6 +93,89 @@ under the License.
<groupId>com.github.siom79.japicmp</groupId>
<artifactId>japicmp-maven-plugin</artifactId>
</plugin>
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.1.4</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xms128m</jvmArg>
+ <jvmArg>-Xmx512m</jvmArg>
+ </jvmArgs>
+ </configuration>
+ </plugin>
+
+ <!-- Eclipse Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+
+ <!-- Adding scala source directories to build path -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Scala Code Style, most of the configuration done via plugin management -->
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <configuration>
+ <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+ </configuration>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
new file mode 100644
index 0000000..9e8a3e4
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+/**
+ * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
+ *
+ * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
+ * and {@link org.apache.hadoop.mapreduce.InputFormat}.
+ *
+ * Key value pairs produced by the Hadoop InputFormats are converted into Flink
+ * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field
+ * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
+ *
+ */
+
+public final class HadoopInputs {
+ // ----------------------------------- Hadoop Input Format ---------------------------------------
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+ */
+ public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+ // set input path in JobConf
+ org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+ // return wrapping InputFormat
+ return createHadoopInput(mapredInputFormat, key, value, job);
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+ */
+ public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+ return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} to read a Hadoop sequence file for the given key and value classes.
+ *
+ * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat.
+ */
+ public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
+ return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.InputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+ */
+ public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+ return new HadoopInputFormat<>(mapredInputFormat, key, value, job);
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+ */
+ public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException
+ {
+ // set input path in Job
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+ // return wrapping InputFormat
+ return createHadoopInput(mapreduceInputFormat, key, value, job);
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
+ */
+ public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException
+ {
+ return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
+ }
+
+ /**
+ * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.InputFormat}.
+ *
+ * @return A Flink InputFormat that wraps the Hadoop InputFormat.
+ */
+ public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
+ org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
+ {
+ return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
new file mode 100644
index 0000000..97ca329
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.commons.cli.Option;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to work with Apache Hadoop libraries.
+ */
+public class HadoopUtils {
+ /**
+ * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}
+ *
+ * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser}
+ * @return A {@link ParameterTool}
+ * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser}
+ * @see GenericOptionsParser
+ */
+ public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException {
+ Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions();
+ Map<String, String> map = new HashMap<String, String>();
+ for (Option option : options) {
+ String[] split = option.getValue().split("=");
+ map.put(split[0], split[1]);
+ }
+ return ParameterTool.fromMap(map);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
new file mode 100644
index 0000000..133a5f4
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hadoopcompatibility.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.hadoop.mapreduce
+import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.hadoop.fs.{Path => HadoopPath}
+import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
+import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat}
+
+/**
+ * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
+ *
+ * It provides methods to create Flink InputFormat wrappers for Hadoop
+ * [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]].
+ *
+ * Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where
+ * the first field is the key and the second field is the value.
+ *
+ */
+object HadoopInputs {
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapred.FileInputFormat]].
+ */
+ def readHadoopFile[K, V](
+ mapredInputFormat: MapredFileInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ inputPath: String,
+ job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+ // set input path in JobConf
+ MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+ // wrap mapredInputFormat
+ createHadoopInput(mapredInputFormat, key, value, job)
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapred.FileInputFormat]].
+ */
+ def readHadoopFile[K, V](
+ mapredInputFormat: MapredFileInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+ readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence
+ * file with the given key and value classes.
+ */
+ def readSequenceFile[K, V](
+ key: Class[K],
+ value: Class[V],
+ inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+ readHadoopFile(
+ new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
+ key,
+ value,
+ inputPath
+ )
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapred.InputFormat]].
+ */
+ def createHadoopInput[K, V](
+ mapredInputFormat: MapredInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
+
+ new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+ */
+ def readHadoopFile[K, V](
+ mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ inputPath: String,
+ job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
+
+ // set input path in Job
+ MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+ // wrap mapreduceInputFormat
+ createHadoopInput(mapreduceInputFormat, key, value, job)
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+ */
+ def readHadoopFile[K, V](
+ mapreduceInputFormat: MapreduceFileInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] =
+ {
+ readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance)
+ }
+
+ /**
+ * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
+ * [[org.apache.hadoop.mapreduce.InputFormat]].
+ */
+ def createHadoopInput[K, V](
+ mapreduceInputFormat: MapreduceInputFormat[K, V],
+ key: Class[K],
+ value: Class[V],
+ job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
+
+ new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
new file mode 100644
index 0000000..6f7673b
--- /dev/null
+++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility;
+
+import org.apache.flink.api.java.utils.AbstractParameterToolTest;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class HadoopUtilsTest extends AbstractParameterToolTest {
+
+ @Test
+ public void testParamsFromGenericOptionsParser() throws IOException {
+ ParameterTool parameter = HadoopUtils.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"});
+ validate(parameter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 6924da8..5bc81c6 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -84,7 +84,7 @@ under the License.
<artifactId>japicmp-maven-plugin</artifactId>
</plugin>
- <!-- Because flink-scala and flink-avro uses it in tests -->
+ <!-- Because flink-scala, flink-avro and flink-hadoop-compatibility uses it in tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index add8531..964eed1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -585,9 +585,12 @@ public abstract class ExecutionEnvironment {
// ----------------------------------- Hadoop Input Format ---------------------------------------
/**
- * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. The
- * given inputName is set on the given job.
+ * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}.
+ *
+ * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String, JobConf)}
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
@@ -600,7 +603,11 @@ public abstract class ExecutionEnvironment {
/**
* Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat}
* A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
- */
+ *
+ * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class<K>, Class<V>, String)}
+ * from the flink-hadoop-compatibility module.
+ */
+ @Deprecated
@PublicEvolving
public <K,V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
@@ -609,7 +616,11 @@ public abstract class ExecutionEnvironment {
/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A
* {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
+ *
+ * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
@@ -617,7 +628,11 @@ public abstract class ExecutionEnvironment {
/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}.
+ *
+ * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V>, Class<K>, Class<V>, JobConf)}
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job);
@@ -628,7 +643,11 @@ public abstract class ExecutionEnvironment {
/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
* given inputName is set on the given job.
+ *
+ * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String, Job)}
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
DataSource<Tuple2<K, V>> result = createHadoopInput(mapreduceInputFormat, key, value, job);
@@ -642,7 +661,11 @@ public abstract class ExecutionEnvironment {
/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
* {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+ *
+ * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
@@ -650,7 +673,11 @@ public abstract class ExecutionEnvironment {
/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
+ *
+ * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V>, Class<K>, Class<V>, Job)}
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 8f504e4..8e15441 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -191,7 +191,10 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
* @return A {@link ParameterTool}
* @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser}
* @see GenericOptionsParser
+ * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopUtils#paramsFromGenericOptionsParser(String[])}
+ * from project flink-hadoop-compatibility
*/
+ @Deprecated
@PublicEvolving
public static ParameterTool fromGenericOptionsParser(String[] args) throws IOException {
Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions();
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
new file mode 100644
index 0000000..9aa9a95
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+public abstract class AbstractParameterToolTest {
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ protected void validate(ParameterTool parameter) {
+ ClosureCleaner.ensureSerializable(parameter);
+ Assert.assertEquals("myInput", parameter.getRequired("input"));
+ Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue"));
+ Assert.assertEquals(null, parameter.get("whatever"));
+ Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L));
+ Assert.assertTrue(parameter.getBoolean("thisIsUseful", true));
+ Assert.assertEquals(42, parameter.getByte("myDefaultByte", (byte) 42));
+ Assert.assertEquals(42, parameter.getShort("myDefaultShort", (short) 42));
+
+ Configuration config = parameter.getConfiguration();
+ Assert.assertEquals(15L, config.getLong("expectedCount", -1L));
+
+ Properties props = parameter.getProperties();
+ Assert.assertEquals("myInput", props.getProperty("input"));
+ props = null;
+
+ // -------- test the default file creation ------------
+ try {
+ String pathToFile = tmp.newFile().getAbsolutePath();
+ parameter.createPropertiesFile(pathToFile);
+ Properties defaultProps = new Properties();
+ try (FileInputStream fis = new FileInputStream(pathToFile)) {
+ defaultProps.load(fis);
+ }
+
+ Assert.assertEquals("myDefaultValue", defaultProps.get("output"));
+ Assert.assertEquals("-1", defaultProps.get("expectedCount"));
+ Assert.assertTrue(defaultProps.containsKey("input"));
+
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
index 605f033..9b63985 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
@@ -18,25 +18,17 @@
package org.apache.flink.api.java.utils;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Properties;
-public class ParameterToolTest {
-
- @Rule
- public TemporaryFolder tmp = new TemporaryFolder();
+public class ParameterToolTest extends AbstractParameterToolTest {
// ----- Parser tests -----------------
@@ -162,40 +154,4 @@ public class ParameterToolTest {
ParameterTool parameter = ParameterTool.fromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"});
validate(parameter);
}
-
- private void validate(ParameterTool parameter) {
- ClosureCleaner.ensureSerializable(parameter);
- Assert.assertEquals("myInput", parameter.getRequired("input"));
- Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue"));
- Assert.assertEquals(null, parameter.get("whatever"));
- Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L));
- Assert.assertTrue(parameter.getBoolean("thisIsUseful", true));
- Assert.assertEquals(42, parameter.getByte("myDefaultByte", (byte) 42));
- Assert.assertEquals(42, parameter.getShort("myDefaultShort", (short) 42));
-
- Configuration config = parameter.getConfiguration();
- Assert.assertEquals(15L, config.getLong("expectedCount", -1L));
-
- Properties props = parameter.getProperties();
- Assert.assertEquals("myInput", props.getProperty("input"));
- props = null;
-
- // -------- test the default file creation ------------
- try {
- String pathToFile = tmp.newFile().getAbsolutePath();
- parameter.createPropertiesFile(pathToFile);
- Properties defaultProps = new Properties();
- try (FileInputStream fis = new FileInputStream(pathToFile)) {
- defaultProps.load(fis);
- }
-
- Assert.assertEquals("myDefaultValue", defaultProps.get("output"));
- Assert.assertEquals("-1", defaultProps.get("expectedCount"));
- Assert.assertTrue(defaultProps.containsKey("input"));
-
- } catch (IOException e) {
- Assert.fail(e.getMessage());
- e.printStackTrace();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 4f9d569..18aab07 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -43,7 +43,7 @@ import scala.collection.JavaConverters._
import scala.reflect.ClassTag
/**
- * The ExecutionEnviroment is the context in which a program is executed. A local environment will
+ * The ExecutionEnvironment is the context in which a program is executed. A local environment will
* cause execution in the current JVM, a remote environment will cause execution on a remote
* cluster installation.
*
@@ -412,7 +412,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The
* given inputName is set on the given job.
+ *
+ * @deprecated Please use
+ * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
def readHadoopFile[K, V](
mapredInputFormat: MapredFileInputFormat[K, V],
@@ -429,7 +434,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A
* [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+ *
+ * @deprecated Please use
+ * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
def readHadoopFile[K, V](
mapredInputFormat: MapredFileInputFormat[K, V],
@@ -443,7 +453,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Creates a [[DataSet]] from [[org.apache.hadoop.mapred.SequenceFileInputFormat]]
* A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+ *
+ * @deprecated Please use
+ * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readSequenceFile]]
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
def readSequenceFile[K, V](
key: Class[K],
@@ -456,7 +471,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]].
+ *
+ * @deprecated Please use
+ * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]]
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
def createHadoopInput[K, V](
mapredInputFormat: MapredInputFormat[K, V],
@@ -471,7 +491,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
* The given inputName is set on the given job.
+ *
+ * @deprecated Please use
+ * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
def readHadoopFile[K, V](
mapreduceInputFormat: MapreduceFileInputFormat[K, V],
@@ -489,7 +514,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* Creates a [[DataSet]] from the given
* [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A
* [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created.
+ *
+ * @deprecated Please use
+ * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]]
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
def readHadoopFile[K, V](
mapreduceInputFormat: MapreduceFileInputFormat[K, V],
@@ -502,7 +532,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]].
+ *
+ * @deprecated Please use
+ * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]]
+ * from the flink-hadoop-compatibility module.
*/
+ @Deprecated
@PublicEvolving
def createHadoopInput[K, V](
mapreduceInputFormat: MapreduceInputFormat[K, V],
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
index a0e3468..80f311a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
@@ -52,11 +53,24 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
+ internalRun(true);
+ postSubmit();
+ resultPath = getTempDirPath("result2");
+ internalRun(false);
+ }
+
+ private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple2<LongWritable, Text>> input;
- DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(),
+ if (isTestDeprecatedAPI) {
+ input = env.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath);
+ } else {
+ input = env.createInput(readHadoopFile(new TextInputFormat(),
+ LongWritable.class, Text.class, textPath));
+ }
DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
index fee49bf..3293770 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
+import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
@@ -52,11 +53,23 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ internalRun(true);
+ postSubmit();
+ resultPath = getTempDirPath("result2");
+ internalRun(false);
+ }
+ private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(),
+ DataSet<Tuple2<LongWritable, Text>> input;
+ if (isTestDeprecatedAPI) {
+ input = env.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath);
+ } else {
+ input = env.createInput(readHadoopFile(new TextInputFormat(),
+ LongWritable.class, Text.class, textPath));
+ }
DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
index b09ecc4..6b414d6 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
@@ -18,12 +18,12 @@
package org.apache.flink.api.scala.hadoop.mapred
import org.apache.flink.api.scala._
-
+import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
+import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{Text, LongWritable}
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextInputFormat, TextOutputFormat}
class WordCountMapredITCase extends JavaProgramTestBase {
protected var textPath: String = null
@@ -39,21 +39,27 @@ class WordCountMapredITCase extends JavaProgramTestBase {
resultPath, Array[String](".", "_"))
}
- protected def testProgram() {
+ private def internalRun (testDeprecatedAPI: Boolean): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val input =
- env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+ if (testDeprecatedAPI) {
+ env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+ } else {
+ env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
+ classOf[Text], textPath))
+ }
- val text = input map { _._2.toString }
- val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
- .map { (_, 1) }
+ val counts = input
+ .map(_._2.toString)
+ .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
.groupBy(0)
.sum(1)
- val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+ val words = counts
+ .map( t => (new Text(t._1), new LongWritable(t._2)) )
- val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+ val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
new TextOutputFormat[Text, LongWritable],
new JobConf)
hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ")
@@ -64,5 +70,12 @@ class WordCountMapredITCase extends JavaProgramTestBase {
env.execute("Hadoop Compat WordCount")
}
+
+ protected def testProgram() {
+ internalRun(testDeprecatedAPI = true)
+ postSubmit()
+ resultPath = getTempDirPath("result2")
+ internalRun(testDeprecatedAPI = false)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
index de2d376..e393d23 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
@@ -19,6 +19,7 @@
package org.apache.flink.api.scala.hadoop.mapreduce
import org.apache.flink.api.scala._
+import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.test.testdata.WordCountData
import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
import org.apache.hadoop.fs.Path
@@ -42,21 +43,34 @@ class WordCountMapreduceITCase extends JavaProgramTestBase {
}
protected def testProgram() {
+ internalRun(testDeprecatedAPI = true)
+ postSubmit()
+ resultPath = getTempDirPath("result2")
+ internalRun(testDeprecatedAPI = false)
+ }
+
+ private def internalRun (testDeprecatedAPI: Boolean): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val input =
- env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+ if (testDeprecatedAPI) {
+ env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+ } else {
+ env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
+ classOf[Text], textPath))
+ }
- val text = input map { _._2.toString }
- val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
- .map { (_, 1) }
+ val counts = input
+ .map(_._2.toString)
+ .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
.groupBy(0)
.sum(1)
- val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+ val words = counts
+ .map( t => (new Text(t._1), new LongWritable(t._2)) )
val job = Job.getInstance()
- val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+ val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
new TextOutputFormat[Text, LongWritable],
job)
hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " ")