You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/13 11:48:03 UTC
flink git commit: [FLINK-7763] [table] Fix testing RowSink for
enabled object reuse.
Repository: flink
Updated Branches:
refs/heads/master 660a45ca1 -> 57333c622
[FLINK-7763] [table] Fix testing RowSink for enabled object reuse.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57333c62
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57333c62
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57333c62
Branch: refs/heads/master
Commit: 57333c62271253248bf3699be31ae7224e97de75
Parents: 660a45c
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Oct 13 11:30:39 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 13 11:30:39 2017 +0200
----------------------------------------------------------------------
.../runtime/stream/table/TableSinkITCase.scala | 18 +++++++++++++++++-
1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/57333c62/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index c5b82fe..07934b8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -51,6 +51,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
def testInsertIntoRegisteredTableSink(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -86,6 +87,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
val path = tmpFile.toURI.toString
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(4)
@@ -109,6 +111,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
def testAppendSinkOnAppendTable(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -137,6 +140,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
def testRetractSinkOnUpdatingTable(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -168,6 +172,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
def testRetractSinkOnAppendTable(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -202,6 +207,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -236,6 +242,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -273,6 +280,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -310,6 +318,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -347,6 +356,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -384,6 +394,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
def testToAppendStreamRowtime(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear
@@ -478,6 +489,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test(expected = classOf[TableException])
def testToAppendStreamMultiRowtime(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -496,6 +508,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test(expected = classOf[TableException])
def testToRetractStreamMultiRowtime(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -658,8 +671,11 @@ object RowCollector {
new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
def addValue(value: JTuple2[JBool, Row]): Unit = {
+
+ // make a copy
+ val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1))
sink.synchronized {
- sink += value
+ sink += copy
}
}