You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2017/03/04 01:46:43 UTC
flink git commit: [FLINK-5586] [table] Extend
TableProgramsClusterTestBase for object reuse mode
Repository: flink
Updated Branches:
refs/heads/master 728c936dd -> d5c320c3b
[FLINK-5586] [table] Extend TableProgramsClusterTestBase for object reuse mode
This closes #3339.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5c320c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5c320c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5c320c3
Branch: refs/heads/master
Commit: d5c320c3b6ee7b99bb2fd72500a0caa7fea24083
Parents: 728c936
Author: Kurt Young <yk...@gmail.com>
Authored: Fri Feb 17 17:02:23 2017 +0800
Committer: Kurt Young <ku...@apache.org>
Committed: Sat Mar 4 09:11:02 2017 +0800
----------------------------------------------------------------------
.../main/java/org/apache/flink/types/Row.java | 14 ++++++
.../api/java/batch/sql/GroupingSetsITCase.java | 4 +-
.../table/api/scala/batch/sql/SortITCase.scala | 33 +++++++++++---
.../api/scala/batch/table/SortITCase.scala | 47 +++++++++++++++++---
.../utils/TableProgramsClusterTestBase.scala | 20 ++++++++-
.../utils/TableProgramsCollectionTestBase.scala | 18 +++++++-
.../batch/utils/TableProgramsTestBase.scala | 9 ----
.../runtime/dataset/DataSetCalcITCase.scala | 6 ++-
.../DataSetUserDefinedFunctionITCase.scala | 6 ++-
.../dataset/DataSetWindowAggregateITCase.scala | 7 ++-
.../test/util/MultipleProgramsTestBase.java | 12 ++---
11 files changed, 139 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-core/src/main/java/org/apache/flink/types/Row.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java
index f9a5add..0b2120f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Row.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -139,4 +139,18 @@ public class Row implements Serializable{
}
return row;
}
+
+ /**
+ * Creates a new Row which copied from another row.
+ *
+ * @param row The row being copied.
+ * @return The cloned new Row
+ */
+ public static Row copy(Row row) {
+ Row ret = new Row(row.getArity());
+ for (int i = 0; i < row.getArity(); ++i) {
+ ret.setField(i, row.getField(i));
+ }
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
index 54f7da7..3f611d5 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
@@ -50,8 +50,8 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
private final static String TABLE_WITH_NULLS_NAME = "MyTableWithNulls";
private BatchTableEnvironment tableEnv;
- public GroupingSetsITCase(TableConfigMode tableConfigMode) {
- super(tableConfigMode);
+ public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode tableConfigMode) {
+ super(mode, tableConfigMode);
}
@Before
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
index c577797..bc04cc7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit._
@@ -32,9 +33,11 @@ import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
+import scala.collection.mutable
@RunWith(classOf[Parameterized])
-class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) {
+class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
+ extends TableProgramsClusterTestBase(mode, configMode) {
private def getExecutionEnvironment = {
val env = ExecutionEnvironment.getExecutionEnvironment
@@ -59,7 +62,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
val expected = sortExpectedly(tupleDataSetStrings)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
def rowOrdering = Ordering.by((r : Row) => {
// ordering for this tuple will fall into the previous defined tupleOrdering,
@@ -91,7 +99,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
val result = results.
filterNot(_.isEmpty)
@@ -117,7 +130,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
val result = results
.filterNot(_.isEmpty)
@@ -143,7 +161,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
def rowOrdering = Ordering.by((r : Row) => {
// ordering for this tuple will fall into the previous defined tupleOrdering,
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
index a84d8a9..09d3c04 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit._
@@ -32,9 +33,11 @@ import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
+import scala.collection.mutable
@RunWith(classOf[Parameterized])
-class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) {
+class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
+ extends TableProgramsClusterTestBase(mode, configMode) {
private def getExecutionEnvironment = {
val env = ExecutionEnvironment.getExecutionEnvironment
@@ -56,7 +59,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
val expected = sortExpectedly(tupleDataSetStrings)
// squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ val results = t.toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
val result = results
.filterNot(_.isEmpty)
@@ -79,7 +87,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
val expected = sortExpectedly(tupleDataSetStrings)
// squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ val results = t.toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
val result = results
.filterNot(_.isEmpty)
@@ -102,7 +115,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
val expected = sortExpectedly(tupleDataSetStrings)
// squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ val results = t.toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
def rowOrdering = Ordering.by((r : Row) => {
// ordering for this tuple will fall into the previous defined tupleOrdering,
@@ -131,7 +149,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
// squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ val results = t.toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
val result = results
.filterNot(_.isEmpty)
@@ -154,7 +177,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
// squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ val results = t.toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
val result = results
.filterNot(_.isEmpty)
@@ -177,7 +205,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
// squash all rows inside a partition into one element
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ val results = t.toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
implicit def rowOrdering = Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int])
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
index b82ea9f..9313b5b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
@@ -18,8 +18,13 @@
package org.apache.flink.table.api.scala.batch.utils
+import java.util
+
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
/**
* This test base provides full cluster-like integration tests for batch programs. Only runtime
@@ -27,6 +32,19 @@ import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
* (e.g. [[org.apache.flink.table.runtime.dataset.DataSetWindowAggregateITCase]])
*/
class TableProgramsClusterTestBase(
+ executionMode: TestExecutionMode,
tableConfigMode: TableConfigMode)
- extends TableProgramsTestBase(TestExecutionMode.CLUSTER, tableConfigMode) {
+ extends TableProgramsTestBase(executionMode, tableConfigMode) {
+}
+
+object TableProgramsClusterTestBase {
+
+ @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](
+ Array(TestExecutionMode.CLUSTER, TableProgramsTestBase.DEFAULT),
+ Array(TestExecutionMode.CLUSTER_OBJECT_REUSE, TableProgramsTestBase.DEFAULT)
+ )
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
index ba0ea61..cd7c12d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
@@ -18,8 +18,13 @@
package org.apache.flink.table.api.scala.batch.utils
+import java.util
+
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
/**
* This test base provides lightweight integration tests for batch programs. However, it does
@@ -27,6 +32,15 @@ import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
* use [[TableProgramsClusterTestBase]].
*/
class TableProgramsCollectionTestBase(
- tableConfigMode: TableConfigMode)
- extends TableProgramsTestBase(TestExecutionMode.COLLECTION, tableConfigMode) {
+ tableConfigMode: TableConfigMode)
+ extends TableProgramsTestBase(TestExecutionMode.COLLECTION, tableConfigMode) {
+}
+
+object TableProgramsCollectionTestBase {
+
+ @Parameterized.Parameters(name = "Table config = {0}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](Array(TableProgramsTestBase.DEFAULT))
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
index 586d716..cf9d947 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
@@ -18,15 +18,10 @@
package org.apache.flink.table.api.scala.batch.utils
-import java.util
-
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{NO_NULL, TableConfigMode}
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
class TableProgramsTestBase(
mode: TestExecutionMode,
@@ -50,8 +45,4 @@ object TableProgramsTestBase {
val DEFAULT = TableConfigMode(nullCheck = true)
val NO_NULL = TableConfigMode(nullCheck = false)
- @Parameterized.Parameters(name = "Table config = {0}")
- def parameters(): util.Collection[Array[java.lang.Object]] = {
- Seq[Array[AnyRef]](Array(DEFAULT))
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala
index f0b3b44..dc6ab1c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2, RichFunc3}
import org.apache.flink.table.utils._
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit.Test
@@ -36,8 +37,9 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class DataSetCalcITCase(
- configMode: TableConfigMode)
- extends TableProgramsClusterTestBase(configMode) {
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsClusterTestBase(mode, configMode) {
@Test
def testUserDefinedScalarFunctionWithParameter(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
index 3d20803..33b2439 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
import org.apache.flink.table.expressions.utils.{Func13, RichFunc2}
import org.apache.flink.table.utils._
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit.Test
@@ -39,8 +40,9 @@ import scala.collection.mutable
@RunWith(classOf[Parameterized])
class DataSetUserDefinedFunctionITCase(
- configMode: TableConfigMode)
- extends TableProgramsClusterTestBase(configMode) {
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsClusterTestBase(mode, configMode) {
@Test
def testCrossJoin(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
index 882f4b6..d57f4f7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
@@ -31,12 +31,15 @@ import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.apache.flink.table.api.ValidationException
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
-class DataSetWindowAggregateITCase(configMode: TableConfigMode)
- extends TableProgramsClusterTestBase(configMode) {
+class DataSetWindowAggregateITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsClusterTestBase(mode, configMode) {
val data = List(
(1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 4e83245..2043cd0 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -62,7 +62,8 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
*/
public enum TestExecutionMode {
CLUSTER,
- COLLECTION
+ CLUSTER_OBJECT_REUSE,
+ COLLECTION,
}
// ------------------------------------------------------------------------
@@ -85,12 +86,13 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
switch(mode){
case CLUSTER:
- TestEnvironment clusterEnv = new TestEnvironment(cluster, 4);
- clusterEnv.setAsContext();
+ new TestEnvironment(cluster, 4).setAsContext();
+ break;
+ case CLUSTER_OBJECT_REUSE:
+ new TestEnvironment(cluster, 4, true).setAsContext();
break;
case COLLECTION:
- CollectionTestEnvironment collectionEnv = new CollectionTestEnvironment();
- collectionEnv.setAsContext();
+ new CollectionTestEnvironment().setAsContext();
break;
}
}