You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/13 11:48:03 UTC

flink git commit: [FLINK-7763] [table] Fix testing RowSink for enabled object reuse.

Repository: flink
Updated Branches:
  refs/heads/master 660a45ca1 -> 57333c622


[FLINK-7763] [table] Fix testing RowSink for enabled object reuse.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57333c62
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57333c62
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57333c62

Branch: refs/heads/master
Commit: 57333c62271253248bf3699be31ae7224e97de75
Parents: 660a45c
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Oct 13 11:30:39 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 13 11:30:39 2017 +0200

----------------------------------------------------------------------
 .../runtime/stream/table/TableSinkITCase.scala    | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57333c62/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index c5b82fe..07934b8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -51,6 +51,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testInsertIntoRegisteredTableSink(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
     val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -86,6 +87,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
     val path = tmpFile.toURI.toString
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setParallelism(4)
 
@@ -109,6 +111,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testAppendSinkOnAppendTable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -137,6 +140,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testRetractSinkOnUpdatingTable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -168,6 +172,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testRetractSinkOnAppendTable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -202,6 +207,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -236,6 +242,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -273,6 +280,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -310,6 +318,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -347,6 +356,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -384,6 +394,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testToAppendStreamRowtime(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
@@ -478,6 +489,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test(expected = classOf[TableException])
   def testToAppendStreamMultiRowtime(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -496,6 +508,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
   @Test(expected = classOf[TableException])
   def testToRetractStreamMultiRowtime(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
@@ -658,8 +671,11 @@ object RowCollector {
     new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
 
   def addValue(value: JTuple2[JBool, Row]): Unit = {
+
+    // make a copy
+    val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1))
     sink.synchronized {
-      sink += value
+      sink += copy
     }
   }