You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2017/03/04 01:46:43 UTC

flink git commit: [FLINK-5586] [table] Extend TableProgramsClusterTestBase for object reuse mode

Repository: flink
Updated Branches:
  refs/heads/master 728c936dd -> d5c320c3b


[FLINK-5586] [table] Extend TableProgramsClusterTestBase for object reuse mode

This closes #3339.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5c320c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5c320c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5c320c3

Branch: refs/heads/master
Commit: d5c320c3b6ee7b99bb2fd72500a0caa7fea24083
Parents: 728c936
Author: Kurt Young <yk...@gmail.com>
Authored: Fri Feb 17 17:02:23 2017 +0800
Committer: Kurt Young <ku...@apache.org>
Committed: Sat Mar 4 09:11:02 2017 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/flink/types/Row.java   | 14 ++++++
 .../api/java/batch/sql/GroupingSetsITCase.java  |  4 +-
 .../table/api/scala/batch/sql/SortITCase.scala  | 33 +++++++++++---
 .../api/scala/batch/table/SortITCase.scala      | 47 +++++++++++++++++---
 .../utils/TableProgramsClusterTestBase.scala    | 20 ++++++++-
 .../utils/TableProgramsCollectionTestBase.scala | 18 +++++++-
 .../batch/utils/TableProgramsTestBase.scala     |  9 ----
 .../runtime/dataset/DataSetCalcITCase.scala     |  6 ++-
 .../DataSetUserDefinedFunctionITCase.scala      |  6 ++-
 .../dataset/DataSetWindowAggregateITCase.scala  |  7 ++-
 .../test/util/MultipleProgramsTestBase.java     | 12 ++---
 11 files changed, 139 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-core/src/main/java/org/apache/flink/types/Row.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java
index f9a5add..0b2120f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Row.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -139,4 +139,18 @@ public class Row implements Serializable{
 		}
 		return row;
 	}
+
+	/**
+	 * Creates a new Row which copied from another row.
+	 *
+	 * @param row The row being copied.
+	 * @return The cloned new Row
+	 */
+	public static Row copy(Row row) {
+		Row ret = new Row(row.getArity());
+		for (int i = 0; i < row.getArity(); ++i) {
+			ret.setField(i, row.getField(i));
+		}
+		return ret;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
index 54f7da7..3f611d5 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
@@ -50,8 +50,8 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 	private final static String TABLE_WITH_NULLS_NAME = "MyTableWithNulls";
 	private BatchTableEnvironment tableEnv;
 
-	public GroupingSetsITCase(TableConfigMode tableConfigMode) {
-		super(tableConfigMode);
+	public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode tableConfigMode) {
+		super(mode, tableConfigMode);
 	}
 
 	@Before

http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
index c577797..bc04cc7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit._
@@ -32,9 +33,11 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
-class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) {
+class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
+    extends TableProgramsClusterTestBase(mode, configMode) {
 
   private def getExecutionEnvironment = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -59,7 +62,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
 
     val expected = sortExpectedly(tupleDataSetStrings)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+      // the rows need to be copied in object reuse mode
+      val copied = new mutable.ArrayBuffer[Row]
+      rows.foreach(r => copied += Row.copy(r))
+      Seq(copied)
+    }).collect()
 
     def rowOrdering = Ordering.by((r : Row) => {
       // ordering for this tuple will fall into the previous defined tupleOrdering,
@@ -91,7 +99,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
 
     val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+      // the rows need to be copied in object reuse mode
+      val copied = new mutable.ArrayBuffer[Row]
+      rows.foreach(r => copied += Row.copy(r))
+      Seq(copied)
+    }).collect()
 
     val result = results.
         filterNot(_.isEmpty)
@@ -117,7 +130,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
 
     val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+      // the rows need to be copied in object reuse mode
+      val copied = new mutable.ArrayBuffer[Row]
+      rows.foreach(r => copied += Row.copy(r))
+      Seq(copied)
+    }).collect()
 
     val result = results
       .filterNot(_.isEmpty)
@@ -143,7 +161,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
 
     val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+      // the rows need to be copied in object reuse mode
+      val copied = new mutable.ArrayBuffer[Row]
+      rows.foreach(r => copied += Row.copy(r))
+      Seq(copied)
+    }).collect()
 
     def rowOrdering = Ordering.by((r : Row) => {
       // ordering for this tuple will fall into the previous defined tupleOrdering,

http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
index a84d8a9..09d3c04 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit._
@@ -32,9 +33,11 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
-class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) {
+class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
+    extends TableProgramsClusterTestBase(mode, configMode) {
 
   private def getExecutionEnvironment = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -56,7 +59,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
 
     val expected = sortExpectedly(tupleDataSetStrings)
     // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+    val results = t.toDataSet[Row].mapPartition(rows => {
+      // the rows need to be copied in object reuse mode
+      val copied = new mutable.ArrayBuffer[Row]
+      rows.foreach(r => copied += Row.copy(r))
+      Seq(copied)
+    }).collect()
 
     val result = results
         .filterNot(_.isEmpty)
@@ -79,7 +87,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
 
     val expected = sortExpectedly(tupleDataSetStrings)
     // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+    val results = t.toDataSet[Row].mapPartition(rows => {
+      // the rows need to be copied in object reuse mode
+      val copied = new mutable.ArrayBuffer[Row]
+      rows.foreach(r => copied += Row.copy(r))
+      Seq(copied)
+    }).collect()
 
     val result = results
         .filterNot(_.isEmpty)
@@ -102,7 +115,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
 
     val expected = sortExpectedly(tupleDataSetStrings)
     // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+    val results = t.toDataSet[Row].mapPartition(rows => {
+      // the rows need to be copied in object reuse mode
+      val copied = new mutable.ArrayBuffer[Row]
+      rows.foreach(r => copied += Row.copy(r))
+      Seq(copied)
+    }).collect()
 
     def rowOrdering = Ordering.by((r : Row) => {
       // ordering for this tuple will fall into the previous defined tupleOrdering,
@@ -131,7 +149,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
 
     val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
     // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+    val results = t.toDataSet[Row].mapPartition(rows => {
+      // the rows need to be copied in object reuse mode
+      val copied = new mutable.ArrayBuffer[Row]
+      rows.foreach(r => copied += Row.copy(r))
+      Seq(copied)
+    }).collect()
 
     val result = results
         .filterNot(_.isEmpty)
@@ -154,7 +177,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
 
     val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
     // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+    val results = t.toDataSet[Row].mapPartition(rows => {
+      // the rows need to be copied in object reuse mode
+      val copied = new mutable.ArrayBuffer[Row]
+      rows.foreach(r => copied += Row.copy(r))
+      Seq(copied)
+    }).collect()
 
     val result = results
         .filterNot(_.isEmpty)
@@ -177,7 +205,12 @@ class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBa
 
     val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
     // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+    val results = t.toDataSet[Row].mapPartition(rows => {
+      // the rows need to be copied in object reuse mode
+      val copied = new mutable.ArrayBuffer[Row]
+      rows.foreach(r => copied += Row.copy(r))
+      Seq(copied)
+    }).collect()
 
     implicit def rowOrdering = Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
index b82ea9f..9313b5b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
@@ -18,8 +18,13 @@
 
 package org.apache.flink.table.api.scala.batch.utils
 
+import java.util
+
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
 
 /**
   * This test base provides full cluster-like integration tests for batch programs. Only runtime
@@ -27,6 +32,19 @@ import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
   * (e.g. [[org.apache.flink.table.runtime.dataset.DataSetWindowAggregateITCase]])
   */
 class TableProgramsClusterTestBase(
+    executionMode: TestExecutionMode,
     tableConfigMode: TableConfigMode)
-  extends TableProgramsTestBase(TestExecutionMode.CLUSTER, tableConfigMode) {
+    extends TableProgramsTestBase(executionMode, tableConfigMode) {
+}
+
+object TableProgramsClusterTestBase {
+
+  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](
+      Array(TestExecutionMode.CLUSTER, TableProgramsTestBase.DEFAULT),
+      Array(TestExecutionMode.CLUSTER_OBJECT_REUSE, TableProgramsTestBase.DEFAULT)
+    )
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
index ba0ea61..cd7c12d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
@@ -18,8 +18,13 @@
 
 package org.apache.flink.table.api.scala.batch.utils
 
+import java.util
+
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
 
 /**
   * This test base provides lightweight integration tests for batch programs. However, it does
@@ -27,6 +32,15 @@ import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
   * use [[TableProgramsClusterTestBase]].
   */
 class TableProgramsCollectionTestBase(
-  tableConfigMode: TableConfigMode)
-  extends TableProgramsTestBase(TestExecutionMode.COLLECTION, tableConfigMode) {
+    tableConfigMode: TableConfigMode)
+    extends TableProgramsTestBase(TestExecutionMode.COLLECTION, tableConfigMode) {
+}
+
+object TableProgramsCollectionTestBase {
+
+  @Parameterized.Parameters(name = "Table config = {0}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](Array(TableProgramsTestBase.DEFAULT))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
index 586d716..cf9d947 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
@@ -18,15 +18,10 @@
 
 package org.apache.flink.table.api.scala.batch.utils
 
-import java.util
-
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{NO_NULL, TableConfigMode}
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
 
 class TableProgramsTestBase(
     mode: TestExecutionMode,
@@ -50,8 +45,4 @@ object TableProgramsTestBase {
   val DEFAULT = TableConfigMode(nullCheck = true)
   val NO_NULL = TableConfigMode(nullCheck = false)
 
-  @Parameterized.Parameters(name = "Table config = {0}")
-  def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](Array(DEFAULT))
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala
index f0b3b44..dc6ab1c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetCalcITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2, RichFunc3}
 import org.apache.flink.table.utils._
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit.Test
@@ -36,8 +37,9 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class DataSetCalcITCase(
-  configMode: TableConfigMode)
-  extends TableProgramsClusterTestBase(configMode) {
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsClusterTestBase(mode, configMode) {
 
   @Test
   def testUserDefinedScalarFunctionWithParameter(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
index 3d20803..33b2439 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
 import org.apache.flink.table.expressions.utils.{Func13, RichFunc2}
 import org.apache.flink.table.utils._
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit.Test
@@ -39,8 +40,9 @@ import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
 class DataSetUserDefinedFunctionITCase(
-  configMode: TableConfigMode)
-  extends TableProgramsClusterTestBase(configMode) {
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+    extends TableProgramsClusterTestBase(mode, configMode) {
 
   @Test
   def testCrossJoin(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
index 882f4b6..d57f4f7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
@@ -31,12 +31,15 @@ import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.apache.flink.table.api.ValidationException
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class DataSetWindowAggregateITCase(configMode: TableConfigMode)
-  extends TableProgramsClusterTestBase(configMode) {
+class DataSetWindowAggregateITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+    extends TableProgramsClusterTestBase(mode, configMode) {
 
   val data = List(
     (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),

http://git-wip-us.apache.org/repos/asf/flink/blob/d5c320c3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 4e83245..2043cd0 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -62,7 +62,8 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 	 */
 	public enum TestExecutionMode {
 		CLUSTER,
-		COLLECTION
+		CLUSTER_OBJECT_REUSE,
+		COLLECTION,
 	}
 	
 	// ------------------------------------------------------------------------
@@ -85,12 +86,13 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 		
 		switch(mode){
 			case CLUSTER:
-				TestEnvironment clusterEnv = new TestEnvironment(cluster, 4);
-				clusterEnv.setAsContext();
+				new TestEnvironment(cluster, 4).setAsContext();
+				break;
+			case CLUSTER_OBJECT_REUSE:
+				new TestEnvironment(cluster, 4, true).setAsContext();
 				break;
 			case COLLECTION:
-				CollectionTestEnvironment collectionEnv = new CollectionTestEnvironment();
-				collectionEnv.setAsContext();
+				new CollectionTestEnvironment().setAsContext();
 				break;
 		}
 	}